MobaXterm在Linux运维中的高级技巧与自动化脚本实战

一、MobaXterm 核心优势与架构解析

1.1 为什么选择MobaXterm进行Linux运维?

# MobaXterm的核心组件
+---------------------------------------+
|     集成环境 (All-in-One)              |
|  +-------------------------------+    |
|  | SSH客户端 (多标签/多会话)      |    |
|  +-------------------------------+    |
|  | X11服务器 (图形应用转发)        |    |
|  +-------------------------------+    |
|  | SFTP文件浏览器 (拖拽上传)      |    |
|  +-------------------------------+    |
|  | 宏录制器 (自动化脚本)          |    |
|  +-------------------------------+    |
|  | 插件系统 (扩展功能)            |    |
|  +-------------------------------+    |
+---------------------------------------+

1.2 环境配置优化

# MobaXterm.ini 关键配置优化
[MobaXterm]
# 会话管理优化
MaxSessions=50
StoreSessionsInRegistry=0
StoreSessionsInFolders=1

# 性能优化
AntiIdleInterval=60
ScrollbackLines=10000
ShareClipboard=1

# SSH配置增强
SSHKeepAlive=1
Compression=1
CompressionLevel=6

# 外观优化
FontName=Consolas
FontSize=11
ColorsScheme=Ubuntu
Transparency=240

二、高级SSH会话管理技巧

2.1 批量服务器管理

#!/bin/bash
# moba_batch_manager.sh
# 批量服务器配置与操作脚本

SERVERS=(
    "user1@192.168.1.10:22"
    "user2@192.168.1.20:22"
    "root@192.168.1.30:2222"
)

# 颜色输出定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color

# 批量执行命令函数
batch_execute() {
    local command="$1"
    echo -e "${YELLOW}[批量执行] ${command}${NC}"
    echo "========================================"
    
    for server in "${SERVERS[@]}"; do
        local user=$(echo $server | cut -d'@' -f1)
        local host=$(echo $server | cut -d'@' -f2 | cut -d':' -f1)
        local port=$(echo $server | cut -d':' -f2)
        
        echo -e "\n${GREEN}>>> 连接到 ${host}${NC}"
        
        # 使用MobaXterm的嵌入式SSH执行
        /drives/c/MobaXterm/plugins/MobaSSH.exe \
            -user "$user" \
            -host "$host" \
            -port "${port:-22}" \
            -pw "your_password" \
            -cmd "$command" \
            -timeout 30
        
        if [ $? -eq 0 ]; then
            echo -e "${GREEN}${host} 执行成功${NC}"
        else
            echo -e "${RED}${host} 执行失败${NC}"
        fi
    done
}

# 批量文件分发
batch_scp() {
    local local_file="$1"
    local remote_path="$2"
    
    for server in "${SERVERS[@]}"; do
        local user=$(echo $server | cut -d'@' -f1)
        local host=$(echo $server | cut -d'@' -f2 | cut -d':' -f1)
        
        echo -e "\n${YELLOW}分发文件到 ${host}${NC}"
        
        # 使用MobaXterm的SFTP功能
        /drives/c/MobaXterm/plugins/MobaSFTP.exe \
            put "$local_file" \
            "sftp://${user}@${host}:${remote_path}" \
            -pw "your_password"
    done
}

# 主菜单
main_menu() {
    while true; do
        clear
        echo "=== MobaXterm 批量运维管理 ==="
        echo "1. 批量执行命令"
        echo "2. 批量分发文件"
        echo "3. 批量收集日志"
        echo "4. 批量检查服务状态"
        echo "5. 批量更新系统"
        echo "6. 退出"
        echo -n "请选择: "
        
        read choice
        case $choice in
            1)
                echo -n "请输入要执行的命令: "
                read cmd
                batch_execute "$cmd"
                ;;
            2)
                echo -n "请输入本地文件路径: "
                read local_file
                echo -n "请输入远程路径: "
                read remote_path
                batch_scp "$local_file" "$remote_path"
                ;;
            3)
                batch_execute "tar -czf /tmp/logs_$(date +%Y%m%d).tar.gz /var/log"
                batch_scp "/tmp/logs_*.tar.gz" "/backup/logs/"
                ;;
            4)
                batch_execute "systemctl list-units --type=service --state=running"
                ;;
            5)
                batch_execute "apt update && apt upgrade -y"
                ;;
            6)
                exit 0
                ;;
            *)
                echo "无效选择"
                ;;
        esac
        
        echo -e "\n按任意键继续..."
        read -n1
    done
}

# 启动
main_menu

2.2 SSH隧道与端口转发高级应用

# MobaXterm 隧道配置文件 (tunnels.xml)
<tunnels>
    <tunnel name="MySQL远程访问" enabled="true">
        <type>Local</type>
        <listen>127.0.0.1:3307</listen>
        <target>db-server:3306</target>
        <server>jump-host</server>
        <user>admin</user>
        <auth>password</auth>
    </tunnel>
    
    <tunnel name="Web管理界面" enabled="true">
        <type>Dynamic</type>
        <listen>1080</listen>
        <server>bastion-host</server>
        <user>proxy-user</user>
        <auth>keyfile:C:\Users\user\.ssh\id_rsa</auth>
    </tunnel>
    
    <tunnel name="K8s Dashboard" enabled="true">
        <type>Local</type>
        <listen>127.0.0.1:8001</listen>
        <target>kubernetes-dashboard:443</target>
        <server>k8s-master</server>
        <user>k8s-admin</user>
        <auth>password</auth>
    </tunnel>
</tunnels>

# 自动化隧道管理脚本
# manage_tunnels.ps1
param(
    [string]$Action = "start",
    [string]$TunnelName = "all"
)

$MobaPath = "C:\Program Files (x86)\Mobatek\MobaXterm"
$TunnelConfig = Join-Path $MobaPath "tunnels.xml"

function Start-Tunnel {
    param([string]$Name)
    
    # 使用MobaXterm命令行启动隧道
    & "$MobaPath\MobaXterm.exe" `
        -newinstance `
        -hideterm `
        -bookmark2 "tunnel://$Name"
    
    Write-Host "隧道 $Name 已启动" -ForegroundColor Green
}

function Stop-Tunnel {
    param([string]$Name)
    
    # 查找并终止隧道进程
    $processes = Get-Process | Where-Object {
        $_.ProcessName -eq "MobaXterm" -and 
        $_.MainWindowTitle -like "*$Name*"
    }
    
    foreach ($proc in $processes) {
        $proc.Kill()
        Write-Host "隧道 $Name 已停止" -ForegroundColor Yellow
    }
}

function Monitor-Tunnels {
    # 监控隧道状态
    while ($true) {
        Clear-Host
        Write-Host "=== 隧道状态监控 ===" -ForegroundColor Cyan
        Write-Host "时间: $(Get-Date -Format 'HH:mm:ss')"
        Write-Host ""
        
        # 检查本地端口监听
        $listening = netstat -an | Select-String "LISTENING"
        
        $tunnels = [xml](Get-Content $TunnelConfig)
        foreach ($tunnel in $tunnels.tunnels.tunnel) {
            $port = $tunnel.listen.Split(':')[1]
            $status = if ($listening | Select-String ":$port ") {
                "✓ 运行中"
            } else {
                "✗ 已停止"
            }
            
            Write-Host "$($tunnel.name) [$($tunnel.type)]" -NoNewline
            Write-Host " - $status" -ForegroundColor $(if ($status -like "*运行中*") {"Green"} else {"Red"})
        }
        
        Start-Sleep -Seconds 5
    }
}

# 执行操作
switch ($Action) {
    "start" {
        if ($TunnelName -eq "all") {
            $tunnels = [xml](Get-Content $TunnelConfig)
            foreach ($tunnel in $tunnels.tunnels.tunnel) {
                if ($tunnel.enabled -eq "true") {
                    Start-Tunnel -Name $tunnel.name
                }
            }
        } else {
            Start-Tunnel -Name $TunnelName
        }
    }
    "stop" {
        Stop-Tunnel -Name $TunnelName
    }
    "monitor" {
        Monitor-Tunnels
    }
    default {
        Write-Host "用法: manage_tunnels.ps1 [start|stop|monitor] [隧道名|all]" -ForegroundColor Yellow
    }
}

三、自动化脚本开发实战

3.1 MobaXterm宏录制与转换

#!/usr/bin/env python3
# moba_macro_converter.py
# 将MobaXterm宏转换为Python脚本

import re
import json
import argparse
from datetime import datetime

class MobaMacroConverter:
    def __init__(self, macro_file):
        self.macro_file = macro_file
        self.commands = []
        
    def parse_macro(self):
        """解析MobaXterm宏文件"""
        with open(self.macro_file, 'r', encoding='utf-8') as f:
            content = f.read()
        
        # 提取命令序列
        pattern = r'<command>(.*?)</command>'
        self.commands = re.findall(pattern, content, re.DOTALL)
        
        return self.commands
    
    def convert_to_python(self, output_file=None):
        """转换为Python自动化脚本"""
        template = '''#!/usr/bin/env python3
# 自动生成的MobaXterm运维脚本
# 生成时间: {timestamp}
# 源宏文件: {macro_file}

import paramiko
import time
import sys
import os

class MobaAutomation:
    def __init__(self, host, username, password=None, key_file=None):
        self.host = host
        self.username = username
        self.password = password
        self.key_file = key_file
        self.client = None
        self.sftp = None
        
    def connect(self):
        """建立SSH连接"""
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        
        try:
            if self.key_file:
                key = paramiko.RSAKey.from_private_key_file(self.key_file)
                self.client.connect(
                    self.host, 
                    username=self.username,
                    pkey=key
                )
            else:
                self.client.connect(
                    self.host,
                    username=self.username,
                    password=self.password
                )
            
            self.sftp = self.client.open_sftp()
            print(f"[✓] 已连接到 {{self.host}}")
            return True
            
        except Exception as e:
            print(f"[✗] 连接失败: {{e}}")
            return False
    
    def execute_command(self, command, timeout=30):
        """执行命令"""
        print(f"[→] 执行: {{command}}")
        
        stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
        
        exit_code = stdout.channel.recv_exit_status()
        output = stdout.read().decode('utf-8', errors='ignore')
        error = stderr.read().decode('utf-8', errors='ignore')
        
        if exit_code != 0:
            print(f"[!] 命令执行失败 (exit={{exit_code}})")
            if error:
                print(f"错误输出: {{error}}")
        else:
            print(f"[✓] 命令执行成功")
        
        return {{
            'exit_code': exit_code,
            'output': output,
            'error': error
        }}
    
    def upload_file(self, local_path, remote_path):
        """上传文件"""
        try:
            self.sftp.put(local_path, remote_path)
            print(f"[✓] 文件已上传: {{local_path}} -> {{remote_path}}")
            return True
        except Exception as e:
            print(f"[✗] 上传失败: {{e}}")
            return False
    
    def download_file(self, remote_path, local_path):
        """下载文件"""
        try:
            self.sftp.get(remote_path, local_path)
            print(f"[✓] 文件已下载: {{remote_path}} -> {{local_path}}")
            return True
        except Exception as e:
            print(f"[✗] 下载失败: {{e}}")
            return False
    
    def close(self):
        """关闭连接"""
        if self.sftp:
            self.sftp.close()
        if self.client:
            self.client.close()
        print("[✓] 连接已关闭")

# 自动化任务
def main():
    # 配置连接信息
    host = "your-server.com"
    username = "your-username"
    password = "your-password"  # 或使用密钥
    
    # 创建自动化实例
    automator = MobaAutomation(host, username, password)
    
    if not automator.connect():
        sys.exit(1)
    
    try:
        # 自动生成的命令序列
{commands}
        
    finally:
        automator.close()

if __name__ == "__main__":
    main()
'''
        
        # 转换命令
        py_commands = []
        for i, cmd in enumerate(self.commands, 1):
            cmd = cmd.strip()
            if not cmd:
                continue
                
            # 处理特殊命令
            if cmd.startswith('upload '):
                parts = cmd.split(' ', 2)
                if len(parts) >= 3:
                    py_commands.append(f'        # 命令 {i}: {cmd}')
                    py_commands.append(f'        automator.upload_file("{parts[1]}", "{parts[2]}")')
            elif cmd.startswith('download '):
                parts = cmd.split(' ', 2)
                if len(parts) >= 3:
                    py_commands.append(f'        # 命令 {i}: {cmd}')
                    py_commands.append(f'        automator.download_file("{parts[1]}", "{parts[2]}")')
            else:
                # 普通命令
                py_commands.append(f'        # 命令 {i}: {cmd}')
                py_commands.append(f'        result = automator.execute_command("{cmd}")')
                py_commands.append(f'        if result["exit_code"] != 0:')
                py_commands.append(f'            print("任务失败,停止执行")')
                py_commands.append(f'            break')
                py_commands.append(f'        time.sleep(1)')
        
        # 生成最终脚本
        final_script = template.format(
            timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            macro_file=self.macro_file,
            commands='\n'.join(py_commands)
        )
        
        if output_file:
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(final_script)
            print(f"[✓] Python脚本已生成: {output_file}")
        else:
            print(final_script)
        
        return final_script

def main():
    parser = argparse.ArgumentParser(description='MobaXterm宏转换工具')
    parser.add_argument('macro', help='MobaXterm宏文件路径')
    parser.add_argument('-o', '--output', help='输出Python文件路径')
    parser.add_argument('-p', '--parse-only', action='store_true', 
                       help='仅解析宏文件,不转换')
    
    args = parser.parse_args()
    
    converter = MobaMacroConverter(args.macro)
    commands = converter.parse_macro()
    
    if args.parse_only:
        print("解析到的命令:")
        for i, cmd in enumerate(commands, 1):
            print(f"{i:3d}. {cmd}")
    else:
        converter.convert_to_python(args.output)

if __name__ == "__main__":
    main()

3.2 高级运维自动化框架

#!/usr/bin/env python3
# moba_automation_framework.py
# 基于MobaXterm的运维自动化框架

import os
import sys
import json
import yaml
import logging
import paramiko
import threading
import queue
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

class MobaAutomationFramework:
    """MobaXterm自动化框架"""
    
    def __init__(self, config_file='config.yaml'):
        self.config = self.load_config(config_file)
        self.setup_logging()
        self.servers = self.config.get('servers', [])
        self.tasks = self.config.get('tasks', {})
        
    def load_config(self, config_file):
        """加载配置文件"""
        config_path = Path(config_file)
        
        if not config_path.exists():
            # 创建默认配置
            default_config = {
                'servers': [
                    {
                        'name': 'web-server-1',
                        'host': '192.168.1.100',
                        'port': 22,
                        'username': 'admin',
                        'password': '',  # 或使用 key_file
                        'key_file': '~/.ssh/id_rsa',
                        'groups': ['web', 'production']
                    }
                ],
                'tasks': {
                    'system_health_check': {
                        'description': '系统健康检查',
                        'commands': [
                            'uptime',
                            'free -h',
                            'df -h',
                            'top -bn1 | head -20'
                        ],
                        'parallel': True
                    },
                    'deploy_application': {
                        'description': '部署应用',
                        'steps': [
                            {'type': 'upload', 'src': 'app.tar.gz', 'dest': '/tmp/'},
                            {'type': 'command', 'cmd': 'tar -xzf /tmp/app.tar.gz -C /opt/app'},
                            {'type': 'command', 'cmd': 'systemctl restart app-service'}
                        ]
                    }
                }
            }
            
            with open(config_file, 'w') as f:
                yaml.dump(default_config, f, default_flow_style=False)
            
            return default_config
        
        with open(config_file, 'r') as f:
            return yaml.safe_load(f)
    
    def setup_logging(self):
        """配置日志"""
        log_dir = Path('logs')
        log_dir.mkdir(exist_ok=True)
        
        log_file = log_dir / f'automation_{datetime.now().strftime("%Y%m%d")}.log'
        
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        
        self.logger = logging.getLogger(__name__)
    
    def execute_on_server(self, server, task):
        """在单个服务器上执行任务"""
        results = {'server': server['name'], 'success': False, 'output': ''}
        
        try:
            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            # 连接服务器
            if server.get('key_file'):
                key = paramiko.RSAKey.from_private_key_file(
                    os.path.expanduser(server['key_file'])
                )
                client.connect(
                    server['host'],
                    port=server.get('port', 22),
                    username=server['username'],
                    pkey=key
                )
            else:
                client.connect(
                    server['host'],
                    port=server.get('port', 22),
                    username=server['username'],
                    password=server.get('password')
                )
            
            self.logger.info(f"开始执行任务在 {server['name']}")
            
            # 执行命令
            if 'commands' in task:
                for cmd in task['commands']:
                    stdin, stdout, stderr = client.exec_command(cmd)
                    exit_code = stdout.channel.recv_exit_status()
                    output = stdout.read().decode()
                    
                    results['output'] += f"$ {cmd}\n{output}\n"
                    
                    if exit_code != 0:
                        error = stderr.read().decode()
                        self.logger.error(f"命令失败: {cmd}, 错误: {error}")
                        results['output'] += f"错误: {error}\n"
                        break
            
            # 执行步骤
            elif 'steps' in task:
                for step in task['steps']:
                    if step['type'] == 'command':
                        stdin, stdout, stderr = client.exec_command(step['cmd'])
                        exit_code = stdout.channel.recv_exit_status()
                        output = stdout.read().decode()
                        
                        results['output'] += f"$ {step['cmd']}\n{output}\n"
                        
                        if exit_code != 0:
                            error = stderr.read().decode()
                            self.logger.error(f"步骤失败: {step['cmd']}")
                            results['output'] += f"错误: {error}\n"
                            break
                    
                    elif step['type'] == 'upload':
                        sftp = client.open_sftp()
                        sftp.put(step['src'], step['dest'])
                        sftp.close()
                        self.logger.info(f"文件已上传: {step['src']} -> {step['dest']}")
            
            client.close()
            results['success'] = True
            self.logger.info(f"任务在 {server['name']} 执行完成")
            
        except Exception as e:
            self.logger.error(f"服务器 {server['name']} 执行失败: {e}")
            results['output'] = str(e)
        
        return results
    
    def run_task(self, task_name, server_filter=None):
        """运行任务"""
        if task_name not in self.tasks:
            self.logger.error(f"任务不存在: {task_name}")
            return
        
        task = self.tasks[task_name]
        self.logger.info(f"开始执行任务: {task_name} - {task.get('description', '')}")
        
        # 筛选服务器
        target_servers = self.servers
        if server_filter:
            if isinstance(server_filter, list):
                target_servers = [s for s in self.servers if s['name'] in server_filter]
            elif isinstance(server_filter, dict):
                # 按组筛选
                if 'groups' in server_filter:
                    target_servers = [
                        s for s in self.servers 
                        if any(g in s.get('groups', []) for g in server_filter['groups'])
                    ]
        
        results = []
        
        # 并行执行
        if task.get('parallel', False):
            with ThreadPoolExecutor(max_workers=10) as executor:
                future_to_server = {
                    executor.submit(self.execute_on_server, server, task): server
                    for server in target_servers
                }
                
                for future in as_completed(future_to_server):
                    server = future_to_server[future]
                    try:
                        result = future.result()
                        results.append(result)
                    except Exception as e:
                        self.logger.error(f"执行异常: {e}")
                        results.append({
                            'server': server['name'],
                            'success': False,
                            'output': str(e)
                        })
        else:
            # 串行执行
            for server in target_servers:
                result = self.execute_on_server(server, task)
                results.append(result)
        
        # 生成报告
        self.generate_report(task_name, results)
        
        # 统计结果
        success_count = sum(1 for r in results if r['success'])
        total_count = len(results)
        
        self.logger.info(f"任务完成: {success_count}/{total_count} 服务器成功")
        
        return results
    
    def generate_report(self, task_name, results):
        """生成执行报告"""
        report_dir = Path('reports')
        report_dir.mkdir(exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        report_file = report_dir / f'report_{task_name}_{timestamp}.html'
        
        # 生成HTML报告
        html = f'''
        <!DOCTYPE html>
        <html>
        <head>
            <meta charset="UTF-8">
            <title>运维自动化报告 - {task_name}</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; }}
                .summary {{ background: #f5f5f5; padding: 15px; border-radius: 5px; margin-bottom: 20px; }}
                .success {{ color: green; }}
                .failed {{ color: red; }}
                .server {{ border: 1px solid #ddd; margin: 10px 0; padding: 10px; border-radius: 3px; }}
                pre {{ background: #f8f8f8; padding: 10px; border-radius: 3px; overflow: auto; }}
            </style>
        </head>
        <body>
            <h1>运维自动化报告</h1>
            <div class="summary">
                <h2>任务: {task_name}</h2>
                <p>执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
                <p>服务器总数: {len(results)}</p>
                <p>成功: <span class="success">{sum(1 for r in results if r['success'])}</span></p>
                <p>失败: <span class="failed">{sum(1 for r in results if not r['success'])}</span></p>
            </div>
        '''
        
        for result in results:
            status_class = 'success' if result['success'] else 'failed'
            status_text = '成功' if result['success'] else '失败'
            
            html += f'''
            <div class="server">
                <h3>服务器: {result['server']} - <span class="{status_class}">{status_text}</span></h3>
                <pre>{result.get('output', '无输出')}</pre>
            </div>
            '''
        
        html += '''
        </body>
        </html>
        '''
        
        with open(report_file, 'w', encoding='utf-8') as f:
            f.write(html)
        
        self.logger.info(f"报告已生成: {report_file}")
        
        return report_file

def main():
    """CLI入口点"""
    import argparse
    
    parser = argparse.ArgumentParser(description='MobaXterm自动化运维框架')
    parser.add_argument('task', help='要执行的任务名称')
    parser.add_argument('-c', '--config', default='config.yaml', help='配置文件路径')
    parser.add_argument('-s', '--servers', nargs='+', help='指定服务器列表')
    parser.add_argument('-g', '--groups', nargs='+', help='按服务器组筛选')
    
    args = parser.parse_args()
    
    # 初始化框架
    framework = MobaAutomationFramework(args.config)
    
    # 构建服务器筛选条件
    server_filter = None
    if args.servers:
        server_filter = args.servers
    elif args.groups:
        server_filter = {'groups': args.groups}
    
    # 执行任务
    framework.run_task(args.task, server_filter)

if __name__ == "__main__":
    main()

四、集成开发与监控

4.1 与CI/CD流水线集成

# .gitlab-ci.yml 示例
stages:
  - deploy
  - verify
  - rollback

variables:
  MOBA_CONFIG: "production_servers.yaml"

deploy_production:
  stage: deploy
  script:
    - echo "开始部署到生产环境"
    - python moba_automation_framework.py deploy_application -c $MOBA_CONFIG -g production
  only:
    - master
  artifacts:
    paths:
      - reports/*.html

health_check:
  stage: verify
  script:
    - echo "执行健康检查"
    - python moba_automation_framework.py system_health_check -c $MOBA_CONFIG -g production
  dependencies:
    - deploy_production
  artifacts:
    paths:
      - reports/*.html

rollback_if_failed:
  stage: rollback
  script:
    - echo "检测到部署失败,执行回滚"
    - python moba_automation_framework.py rollback_application -c $MOBA_CONFIG -g production
  when: on_failure

4.2 实时监控看板

#!/usr/bin/env python3
# moba_monitor_dashboard.py
# 实时监控看板

from flask import Flask, render_template, jsonify
import psutil
import json
from datetime import datetime
import threading
import time
from moba_automation_framework import MobaAutomationFramework

app = Flask(__name__)
framework = MobaAutomationFramework()

class ServerMonitor:
    def __init__(self, update_interval=60):
        self.update_interval = update_interval
        self.monitor_data = {}
        self.running = False
        
    def start_monitoring(self):
        """启动监控线程"""
        self.running = True
        monitor_thread = threading.Thread(target=self._monitor_loop)
        monitor_thread.daemon = True
        monitor_thread.start()
    
    def _monitor_loop(self):
        """监控循环"""
        while self.running:
            try:
                self.update_monitor_data()
                time.sleep(self.update_interval)
            except Exception as e:
                print(f"监控出错: {e}")
    
    def update_monitor_data(self):
        """更新监控数据"""
        for server in framework.servers:
            try:
                # 连接到服务器获取状态
                client = paramiko.SSHClient()
                client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                
                if server.get('key_file'):
                    key = paramiko.RSAKey.from_private_key_file(server['key_file'])
                    client.connect(
                        server['host'],
                        username=server['username'],
                        pkey=key
                    )
                else:
                    client.connect(
                        server['host'],
                        username=server['username'],
                        password=server.get('password')
                    )
                
                # 获取系统状态
                commands = {
                    'cpu': "top -bn1 | grep 'Cpu(s)'",
                    'memory': "free -m | grep Mem",
                    'disk': "df -h / | tail -1",
                    'uptime': "uptime",
                    'connections': "netstat -an | grep ESTABLISHED | wc -l"
                }
                
                server_data = {'timestamp': datetime.now().isoformat()}
                
                for key, cmd in commands.items():
                    stdin, stdout, stderr = client.exec_command(cmd)
                    output = stdout.read().decode().strip()
                    server_data[key] = output
                
                client.close()
                
                # 计算健康分数
                server_data['health_score'] = self.calculate_health_score(server_data)
                self.monitor_data[server['name']] = server_data
                
            except Exception as e:
                self.monitor_data[server['name']] = {
                    'error': str(e),
                    'health_score': 0,
                    'timestamp': datetime.now().isoformat()
                }
    
    def calculate_health_score(self, data):
        """计算健康分数"""
        score = 100
        
        # 分析CPU使用率
        if 'cpu' in data:
            try:
                cpu_str = data['cpu']
                cpu_usage = float(cpu_str.split(',')[0].split('%')[0].strip())
                if cpu_usage > 80:
                    score -= 20
                elif cpu_usage > 95:
                    score -= 50
            except:
                pass
        
        # 分析内存使用率
        if 'memory' in data:
            try:
                mem_parts = data['memory'].split()
                total = int(mem_parts[1])
                used = int(mem_parts[2])
                mem_usage = (used / total) * 100
                if mem_usage > 80:
                    score -= 20
                elif mem_usage > 95:
                    score -= 50
            except:
                pass
        
        return max(0, score)

monitor = ServerMonitor()
monitor.start_monitoring()

@app.route('/')
def dashboard():
    """主监控页面"""
    return render_template('dashboard.html')

@app.route('/api/monitor')
def get_monitor_data():
    """获取监控数据API"""
    return jsonify(monitor.monitor_data)

@app.route('/api/servers')
def get_servers():
    """获取服务器列表"""
    servers_info = []
    for server in framework.servers:
        server_info = {
            'name': server['name'],
            'host': server['host'],
            'groups': server.get('groups', []),
            'last_status': monitor.monitor_data.get(server['name'], {})
        }
        servers_info.append(server_info)
    
    return jsonify(servers_info)

@app.route('/api/task/run/<task_name>')
def run_task(task_name):
    """运行任务API"""
    try:
        results = framework.run_task(task_name)
        return jsonify({
            'success': True,
            'results': results
        })
    except Exception as e:
        return jsonify({
            'success': False,
            'error': str(e)
        })

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

五、安全与最佳实践

5.1 安全配置指南

# security_hardening.sh
#!/bin/bash
# MobaXterm安全加固脚本

echo "开始MobaXterm安全加固..."

# 1. 配置文件权限加固
chmod 600 ~/MobaXterm.ini
chmod 700 ~/MobaXterm/sessions/

# 2. 会话加密
echo "启用会话加密..."
cat > ~/MobaXterm/sec_config.ini << EOF
[Security]
EncryptPasswords=1
EncryptionKey=$(openssl rand -base64 32)
SessionTimeout=300
MaxFailedAttempts=3
EOF

# 3. SSH密钥管理
echo "设置SSH密钥..."
mkdir -p ~/.ssh
chmod 700 ~/.ssh

# 生成新的SSH密钥对
ssh-keygen -t ed25519 -f ~/.ssh/moba_key -N "" -q

# 4. 配置SSH Agent转发限制
cat > ~/.ssh/config << EOF
Host *
    ForwardAgent no
    AddKeysToAgent yes
    UseKeychain yes
    ControlMaster auto
    ControlPath ~/.ssh/%r@%h:%p
    ControlPersist 600
    
# 生产服务器严格限制
Host production-*
    PasswordAuthentication no
    PubkeyAuthentication yes
    PermitRootLogin no
    MaxAuthTries 2
EOF

# 5. 设置MobaXterm自动锁定
cat >> ~/MobaXterm.ini << EOF
[Security]
AutoLock=1
LockTimeout=300
RequirePasswordOnResume=1
EOF

# 6. 审计日志配置
echo "启用审计日志..."
mkdir -p ~/MobaXterm/logs/audit
cat > ~/MobaXterm/audit.conf << EOF
[Audit]
EnableCommandLogging=1
EnableSessionLogging=1
LogDirectory=~/MobaXterm/logs/audit
RetentionDays=保留天数=90
日志轮转大小=100M
敏感命令告警=1
EOF

# 7. 配置防火墙规则
echo "配置Windows防火墙..."
netsh advfirewall firewall add rule name="MobaXterm SSH" dir=in action=allow protocol=TCP localport=22 enable=yes profile=any

# 8. 禁用危险功能
cat >> ~/MobaXterm.ini << EOF
[Restrictions]
DisableTunnelCreation=0
DisableFileTransfer=0
DisablePluginInstallation=1
AllowUnsafeConnections=0
EOF

# 9. 备份配置文件
echo "备份配置文件..."
backup_dir="~/MobaXterm/backup/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$backup_dir"
cp ~/MobaXterm.ini "$backup_dir/"
cp -r ~/MobaXterm/sessions/ "$backup_dir/" 2>/dev/null || true

# 10. 创建安全报告
cat > "安全加固报告.txt" << EOF
=== MobaXterm 安全加固报告 ===
完成时间: $(date)
加固项目:
1. 配置文件权限设置完成
2. 会话加密已启用
3. SSH密钥对已生成: ~/.ssh/moba_key
4. SSH配置已更新
5. 自动锁定功能已启用
6. 审计日志已配置
7. 防火墙规则已添加
8. 危险功能已限制
9. 配置文件已备份到: $backup_dir

安全建议:
- 定期更改SSH密钥
- 查看审计日志: ~/MobaXterm/logs/audit/
- 启用双因素认证
- 定期更新MobaXterm版本
EOF

echo "安全加固完成!"

5.2 性能优化配置

# performance_optimization.ps1
# MobaXterm性能优化脚本

param(
    [switch]$Apply,
    [switch]$Revert,
    [switch]$Test
)

$MobaPath = "$env:USERPROFILE\Documents\MobaXterm"
$IniFile = "$MobaPath\MobaXterm.ini"
$BackupFile = "$MobaPath\MobaXterm.ini.backup"

function Optimize-Performance {
    Write-Host "开始优化MobaXterm性能..." -ForegroundColor Cyan
    
    # 备份原配置
    if (Test-Path $IniFile) {
        Copy-Item $IniFile $BackupFile -Force
        Write-Host "配置文件已备份到: $BackupFile" -ForegroundColor Green
    }
    
    # 性能优化配置
    $optimizations = @"
[MobaXterm]
; 网络优化
TCPKeepAlive=1
CompressionLevel=9
UseDNS=0
AutoReconnect=1
ReconnectDelay=5

; 内存和CPU优化
MaxScrollbackLines=5000
UseBackgroundThread=1
AntialiasedText=1
DisableAnimation=1

; 渲染优化
DoubleBuffering=1
UseDirectWrite=1
FontQuality=CLEARTYPE_NATURAL_QUALITY

; 会话管理优化
SessionCacheSize=50
EnableSessionCompression=1
AutoSaveSession=1
SaveSessionInterval=300

; 文件传输优化
SFTPBufferSize=65536
ParallelTransfers=4
TransferRetryCount=3
ResumeBrokenTransfers=1

; 显示优化
DisableBanner=1
HideMenuBar=0
UseTabs=1
TabColorMode=1

; 终端性能
TypeAhead=1
DisableAltF4=0
EnableX11Forwarding=1
X11UseLocalhost=1
"@
    
    # 应用优化配置
    Add-Content -Path $IniFile -Value $optimizations -Encoding UTF8
    Write-Host "性能优化配置已应用" -ForegroundColor Green
    
    # 创建性能测试脚本
    $testScript = @"
# MobaXterm性能测试
echo "=== 性能测试开始 ==="
echo "测试时间: $(Get-Date)"

# 测试网络连接速度
echo "`n1. 网络连接测试:"
Test-NetConnection -ComputerName github.com -Port 443 | Select-Object TcpTestSucceeded, Latency

# 测试SSH连接
echo "`n2. SSH连接测试:"
Measure-Command { ssh localhost "echo 'SSH连接测试成功'" } | Select-Object TotalSeconds

# 测试文件传输速度
echo "`n3. 文件传输测试:"
$testFile = [System.IO.Path]::GetTempFileName()
1..1000 | ForEach-Object { "测试数据 $_" } | Set-Content $testFile
Measure-Command { scp $testFile localhost:/tmp/ } | Select-Object TotalSeconds
Remove-Item $testFile

# 测试X11转发性能
echo "`n4. X11转发测试:"
if (Test-Path "C:\Windows\System32\calc.exe") {
    Measure-Command { 
        ssh -X localhost "xcalc &" 
        Start-Sleep 2
        Get-Process xcalc* | Stop-Process
    } | Select-Object TotalSeconds
}

echo "`n=== 性能测试完成 ==="
"@
    
    Set-Content -Path "$MobaPath\performance_test.ps1" -Value $testScript -Encoding UTF8
    Write-Host "性能测试脚本已创建: $MobaPath\performance_test.ps1" -ForegroundColor Green
}

function Test-Performance {
    Write-Host "运行性能测试..." -ForegroundColor Cyan
    & "$MobaPath\performance_test.ps1"
    
    # 基准对比
    $benchmarks = @{
        "SSH连接延迟" = "应 < 0.5秒"
        "文件传输速度" = "应 > 1 MB/s"
        "X11启动时间" = "应 < 3秒"
        "内存使用" = "应 < 100 MB"
    }
    
    Write-Host "`n=== 性能基准 ===" -ForegroundColor Yellow
    $benchmarks.GetEnumerator() | ForEach-Object {
        Write-Host "$($_.Key): $($_.Value)" -ForegroundColor White
    }
}

function Revert-Changes {
    if (Test-Path $BackupFile) {
        Write-Host "恢复原始配置..." -ForegroundColor Yellow
        Copy-Item $BackupFile $IniFile -Force
        Write-Host "配置已恢复" -ForegroundColor Green
    } else {
        Write-Host "未找到备份文件" -ForegroundColor Red
    }
}

# 主逻辑
if ($Apply) {
    Optimize-Performance
    Test-Performance
} elseif ($Revert) {
    Revert-Changes
} elseif ($Test) {
    Test-Performance
} else {
    Write-Host "使用方法:" -ForegroundColor Yellow
    Write-Host "  .\performance_optimization.ps1 -Apply    # 应用优化并测试"
    Write-Host "  .\performance_optimization.ps1 -Revert   # 恢复原始配置"
    Write-Host "  .\performance_optimization.ps1 -Test     # 仅运行性能测试"
}

六、高级插件开发

6.1 自定义插件开发指南

#!/usr/bin/env python3
# moba_custom_plugin.py
# MobaXterm自定义插件开发

"""
MobaXterm插件结构:
MobaXterm/
├── plugins/
│   ├── MyPlugin/
│   │   ├── plugin.ini      # 插件配置文件
│   │   ├── main.py         # 主程序
│   │   ├── ui.xml          # 用户界面定义
│   │   └── resources/      # 资源文件
│   └── ThirdPartyPlugins/  # 第三方插件目录
"""

import configparser
import json
import os
import sys
import tkinter as tk
from tkinter import ttk, messagebox
from pathlib import Path

class MobaPlugin:
    """MobaXterm插件基类"""
    
    def __init__(self, plugin_path):
        self.plugin_path = Path(plugin_path)
        self.config = self.load_config()
        self.ui_elements = {}
        
    def load_config(self):
        """加载插件配置"""
        config_file = self.plugin_path / "plugin.ini"
        config = configparser.ConfigParser()
        config.read(config_file)
        return config
    
    def create_gui(self):
        """创建插件界面"""
        self.root = tk.Tk()
        self.root.title(self.config.get('Plugin', 'Name', fallback='MobaXterm插件'))
        
        # 设置窗口大小和位置
        width = self.config.getint('UI', 'Width', fallback=600)
        height = self.config.getint('UI', 'Height', fallback=400)
        self.root.geometry(f"{width}x{height}")
        
        # 创建主框架
        self.create_main_frame()
        
        # 绑定事件
        self.root.protocol("WM_DELETE_WINDOW", self.on_close)
        
        return self.root
    
    def create_main_frame(self):
        """创建主框架"""
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        
        # 标题
        title = ttk.Label(
            main_frame,
            text=self.config.get('Plugin', 'Name'),
            font=("Arial", 16, "bold")
        )
        title.grid(row=0, column=0, columnspan=2, pady=(0, 20))
        
        # 服务器列表
        self.create_server_list(main_frame)
        
        # 操作按钮
        self.create_action_buttons(main_frame)
        
        # 日志输出
        self.create_log_output(main_frame)
        
        # 配置网格权重
        self.root.columnconfigure(0, weight=1)
        self.root.rowconfigure(0, weight=1)
        main_frame.columnconfigure(1, weight=1)
        main_frame.rowconfigure(3, weight=1)
    
    def create_server_list(self, parent):
        """创建服务器列表"""
        ttk.Label(parent, text="服务器列表:").grid(row=1, column=0, sticky=tk.W, pady=(0, 5))
        
        # 服务器列表框
        frame = ttk.Frame(parent)
        frame.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
        
        # 服务器树形列表
        self.server_tree = ttk.Treeview(frame, columns=('status', 'name', 'host'), show='tree headings')
        self.server_tree.heading('#0', text='组')
        self.server_tree.heading('status', text='状态')
        self.server_tree.heading('name', text='名称')
        self.server_tree.heading('host', text='地址')
        
        # 设置列宽
        self.server_tree.column('#0', width=100)
        self.server_tree.column('status', width=60)
        self.server_tree.column('name', width=120)
        self.server_tree.column('host', width=150)
        
        # 滚动条
        scrollbar = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=self.server_tree.yview)
        self.server_tree.configure(yscrollcommand=scrollbar.set)
        
        self.server_tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
        
        frame.columnconfigure(0, weight=1)
        frame.rowconfigure(0, weight=1)
        
        # 加载服务器数据
        self.load_servers()
    
    def load_servers(self):
        """加载服务器数据"""
        # 从MobaXterm会话文件加载
        sessions_path = Path.home() / "Documents" / "MobaXterm" / "sessions"
        
        if sessions_path.exists():
            for session_file in sessions_path.glob("*.mx*"):
                try:
                    with open(session_file, 'r') as f:
                        content = f.read()
                    
                    # 解析会话文件
                    name = session_file.stem
                    host = self.extract_host(content)
                    group = self.extract_group(content)
                    
                    # 添加到树形列表
                    if group not in self.server_tree.get_children():
                        self.server_tree.insert('', 'end', group, text=group)
                    
                    self.server_tree.insert(
                        group, 'end',
                        values=('●', name, host)
                    )
                except:
                    continue
    
    def extract_host(self, content):
        """从会话内容提取主机信息"""
        import re
        match = re.search(r'Host=([^\n]+)', content)
        return match.group(1) if match else "N/A"
    
    def extract_group(self, content):
        """从会话内容提取组信息"""
        import re
        match = re.search(r'Group=([^\n]+)', content)
        return match.group(1) if match else "默认组"
    
    def create_action_buttons(self, parent):
        """创建操作按钮"""
        button_frame = ttk.Frame(parent)
        button_frame.grid(row=3, column=0, columnspan=2, pady=(10, 0))
        
        actions = [
            ("连接", self.connect_server),
            ("断开", self.disconnect_server),
            ("监控", self.monitor_server),
            ("执行命令", self.execute_command),
            ("传输文件", self.transfer_file),
            ("生成报告", self.generate_report)
        ]
        
        for i, (text, command) in enumerate(actions):
            btn = ttk.Button(button_frame, text=text, command=command)
            btn.grid(row=0, column=i, padx=2)
    
    def create_log_output(self, parent):
        """创建日志输出区域"""
        ttk.Label(parent, text="操作日志:").grid(row=4, column=0, sticky=tk.W, pady=(10, 5))
        
        log_frame = ttk.Frame(parent)
        log_frame.grid(row=5, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
        
        # 日志文本框
        self.log_text = tk.Text(log_frame, height=10, wrap=tk.WORD)
        log_scrollbar = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self.log_text.yview)
        self.log_text.configure(yscrollcommand=log_scrollbar.set)
        
        self.log_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        log_scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
        
        log_frame.columnconfigure(0, weight=1)
        log_frame.rowconfigure(0, weight=1)
    
    def log_message(self, message, level="INFO"):
        """记录日志消息"""
        import datetime
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        formatted = f"[{timestamp}] [{level}] {message}\n"
        
        self.log_text.insert(tk.END, formatted)
        self.log_text.see(tk.END)
        
        # 颜色编码
        if level == "ERROR":
            self.log_text.tag_add("error", "end-2l", "end-1l")
            self.log_text.tag_config("error", foreground="red")
        elif level == "WARNING":
            self.log_text.tag_add("warning", "end-2l", "end-1l")
            self.log_text.tag_config("warning", foreground="orange")
    
    def connect_server(self):
        """连接选中的服务器"""
        selection = self.server_tree.selection()
        if not selection:
            messagebox.showwarning("警告", "请先选择一个服务器")
            return
        
        item = selection[0]
        values = self.server_tree.item(item, 'values')
        if values:
            server_name = values[1]
            self.log_message(f"正在连接到服务器: {server_name}")
            # 实际连接逻辑...
    
    def disconnect_server(self):
        """断开服务器连接"""
        self.log_message("断开服务器连接", "INFO")
    
    def monitor_server(self):
        """监控服务器状态"""
        self.log_message("开始监控服务器", "INFO")
    
    def execute_command(self):
        """执行远程命令"""
        # 创建命令输入对话框
        dialog = tk.Toplevel(self.root)
        dialog.title("执行命令")
        dialog.geometry("400x200")
        
        ttk.Label(dialog, text="输入要执行的命令:").pack(pady=10)
        
        cmd_entry = ttk.Entry(dialog, width=50)
        cmd_entry.pack(pady=5)
        
        def execute():
            command = cmd_entry.get()
            if command:
                self.log_message(f"执行命令: {command}")
                # 执行远程命令逻辑...
                dialog.destroy()
        
        ttk.Button(dialog, text="执行", command=execute).pack(pady=10)
    
    def transfer_file(self):
        """传输文件"""
        self.log_message("文件传输功能", "INFO")
    
    def generate_report(self):
        """生成报告"""
        import webbrowser
        self.log_message("生成运维报告", "INFO")
        
        # 创建示例报告
        report_html = """
        <html>
        <head><title>运维报告</title></head>
        <body>
            <h1>MobaXterm插件运维报告</h1>
            <p>生成时间: 2024</p>
        </body>
        </html>
        """
        
        report_file = self.plugin_path / "report.html"
        with open(report_file, 'w') as f:
            f.write(report_html)
        
        webbrowser.open(f"file://{report_file}")
    
    def on_close(self):
        """关闭插件窗口"""
        if messagebox.askokcancel("退出", "确定要退出插件吗?"):
            self.root.destroy()
    
    def run(self):
        """运行插件"""
        self.create_gui()
        self.root.mainloop()

# 插件配置文件模板
def create_plugin_template(plugin_name):
    """创建插件模板"""
    plugin_dir = Path.home() / "Documents" / "MobaXterm" / "plugins" / plugin_name
    plugin_dir.mkdir(parents=True, exist_ok=True)
    
    # 创建plugin.ini
    plugin_ini = f"""[Plugin]
Name = {plugin_name}
Version = 1.0.0
Author = Your Name
Description = 这是一个MobaXterm插件示例
Icon = icon.ico

[UI]
Width = 800
Height = 600

[Settings]
AutoStart = 0
RefreshInterval = 60
"""
    
    (plugin_dir / "plugin.ini").write_text(plugin_ini, encoding='utf-8')
    
    # 创建主程序文件
    main_py = Path(__file__).read_text()
    (plugin_dir / "main.py").write_text(main_py, encoding='utf-8')
    
    # 创建空资源目录
    (plugin_dir / "resources").mkdir(exist_ok=True)
    
    print(f"插件模板已创建到: {plugin_dir}")

if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] == "create":
        plugin_name = sys.argv[2] if len(sys.argv) > 2 else "MyPlugin"
        create_plugin_template(plugin_name)
    else:
        # 运行插件
        plugin_path = Path(__file__).parent
        plugin = MobaPlugin(plugin_path)
        plugin.run()

七、实战案例:自动化部署系统

7.1 完整的自动化部署系统

#!/usr/bin/env python3
# moba_deployment_system.py
# 基于MobaXterm的自动化部署系统

import os
import sys
import json
import yaml
import paramiko
import hashlib
import threading
import queue
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Any
from enum import Enum
from concurrent.futures import ThreadPoolExecutor

class DeploymentStatus(Enum):
    PENDING = "pending"
    PREPARING = "preparing"
    DEPLOYING = "deploying"
    VERIFYING = "verifying"
    COMPLETED = "completed"
    FAILED = "failed"
    ROLLED_BACK = "rolled_back"

@dataclass
class DeploymentTarget:
    name: str
    host: str
    port: int = 22
    username: str = "root"
    groups: List[str] = None
    pre_deploy_scripts: List[str] = None
    post_deploy_scripts: List[str] = None
    
    def __post_init__(self):
        if self.groups is None:
            self.groups = []
        if self.pre_deploy_scripts is None:
            self.pre_deploy_scripts = []
        if self.post_deploy_scripts is None:
            self.post_deploy_scripts = []

@dataclass
class DeploymentArtifact:
    name: str
    version: str
    source_path: str
    target_path: str
    checksum: str = None
    backup_path: Optional[str] = None
    
    def calculate_checksum(self):
        """计算文件校验和"""
        if os.path.exists(self.source_path):
            hash_md5 = hashlib.md5()
            with open(self.source_path, "rb") as f:
                for chunk in iter(lambda: f.read(4096), b""):
                    hash_md5.update(chunk)
            self.checksum = hash_md5.hexdigest()
        return self.checksum

@dataclass
class DeploymentStep:
    name: str
    type: str  # command, upload, download, backup, rollback
    command: Optional[str] = None
    source: Optional[str] = None
    target: Optional[str] = None
    timeout: int = 300
    retry_count: int = 3
    rollback_command: Optional[str] = None

class DeploymentSystem:
    """自动化部署系统"""
    
    def __init__(self, config_path: str = "deployment_config.yaml"):
        self.config_path = Path(config_path)
        self.config = self.load_config()
        self.deployments = {}
        self.setup_logging()
        
    def load_config(self) -> Dict:
        """加载部署配置"""
        if not self.config_path.exists():
            # 创建示例配置
            sample_config = {
                'deployment': {
                    'max_parallel': 5,
                    'timeout': 1800,
                    'backup_enabled': True,
                    'backup_retention': 7,
                    'notification': {
                        'enabled': True,
                        'webhook': 'https://hooks.slack.com/services/...'
                    }
                },
                'artifacts': [
                    {
                        'name': 'webapp',
                        'source': './dist/webapp.tar.gz',
                        'target': '/opt/webapp'
                    }
                ],
                'targets': [
                    {
                        'name': 'web-server-1',
                        'host': '192.168.1.100',
                        'groups': ['web', 'production']
                    }
                ],
                'pipeline': {
                    'pre_deploy': [
                        {'type': 'command', 'cmd': 'systemctl stop nginx'}
                    ],
                    'deploy': [
                        {'type': 'upload', 'src': './dist/webapp.tar.gz', 'dest': '/tmp/'},
                        {'type': 'command', 'cmd': 'tar -xzf /tmp/webapp.tar.gz -C /opt/webapp'}
                    ],
                    'post_deploy': [
                        {'type': 'command', 'cmd': 'systemctl start nginx'},
                        {'type': 'command', 'cmd': 'systemctl status nginx'}
                    ],
                    'rollback': [
                        {'type': 'command', 'cmd': 'systemctl stop nginx'},
                        {'type': 'command', 'cmd': 'cp -r /opt/webapp_backup /opt/webapp'},
                        {'type': 'command', 'cmd': 'systemctl start nginx'}
                    ]
                }
            }
            
            with open(self.config_path, 'w') as f:
                yaml.dump(sample_config, f, default_flow_style=False)
            
            return sample_config
        
        with open(self.config_path, 'r') as f:
            return yaml.safe_load(f)
    
    def setup_logging(self):
        """配置日志系统"""
        log_dir = Path("deployment_logs")
        log_dir.mkdir(exist_ok=True)
        
        self.log_file = log_dir / f"deployment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
        
        import logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s [%(levelname)s] %(message)s',
            handlers=[
                logging.FileHandler(self.log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def create_deployment(self, artifact: DeploymentArtifact, 
                          targets: List[DeploymentTarget],
                          description: str = "") -> str:
        """创建部署任务"""
        deployment_id = hashlib.md5(
            f"{artifact.name}-{artifact.version}-{datetime.now()}".encode()
        ).hexdigest()[:8]
        
        deployment = {
            'id': deployment_id,
            'artifact': asdict(artifact),
            'targets': [asdict(t) for t in targets],
            'description': description,
            'status': DeploymentStatus.PENDING.value,
            'created_at': datetime.now().isoformat(),
            'steps': [],
            'results': {}
        }
        
        self.deployments[deployment_id] = deployment
        self.save_deployment_state(deployment_id)
        
        self.logger.info(f"创建部署任务: {deployment_id} - {artifact.name} v{artifact.version}")
        return deployment_id
    
    def execute_deployment(self, deployment_id: str):
        """执行部署任务"""
        deployment = self.deployments[deployment_id]
        deployment['status'] = DeploymentStatus.DEPLOYING.value
        deployment['started_at'] = datetime.now().isoformat()
        
        self.logger.info(f"开始执行部署: {deployment_id}")
        
        targets = deployment['targets']
        artifact = deployment['artifact']
        
        # 并行执行部署
        max_parallel = self.config['deployment'].get('max_parallel', 5)
        
        with ThreadPoolExecutor(max_workers=max_parallel) as executor:
            future_to_target = {}
            
            for target in targets:
                future = executor.submit(
                    self.deploy_to_target,
                    target,
                    artifact,
                    deployment_id
                )
                future_to_target[future] = target['name']
            
            # 收集结果
            for future in as_completed(future_to_target):
                target_name = future_to_target[future]
                try:
                    result = future.result()
                    deployment['results'][target_name] = result
                    
                    if result['success']:
                        self.logger.info(f"部署成功到 {target_name}")
                    else:
                        self.logger.error(f"部署失败到 {target_name}: {result.get('error')}")
                        
                except Exception as e:
                    self.logger.error(f"部署异常到 {target_name}: {e}")
                    deployment['results'][target_name] = {
                        'success': False,
                        'error': str(e)
                    }
        
        # 检查整体状态
        success_count = sum(1 for r in deployment['results'].values() if r.get('success'))
        total_count = len(targets)
        
        if success_count == total_count:
            deployment['status'] = DeploymentStatus.COMPLETED.value
            self.logger.info(f"部署完成: {success_count}/{total_count} 成功")
        else:
            deployment['status'] = DeploymentStatus.FAILED.value
            self.logger.error(f"部署失败: {success_count}/{total_count} 成功")
            
            # 自动回滚
            if self.config['deployment'].get('auto_rollback', True):
                self.logger.info("开始自动回滚...")
                self.rollback_deployment(deployment_id)
        
        deployment['completed_at'] = datetime.now().isoformat()
        self.save_deployment_state(deployment_id)
        
        # 发送通知
        self.send_notification(deployment)
        
        return deployment
    
    def deploy_to_target(self, target: Dict, artifact: Dict, deployment_id: str) -> Dict:
        """部署到单个目标"""
        result = {
            'target': target['name'],
            'success': False,
            'steps': [],
            'start_time': datetime.now().isoformat()
        }
        
        try:
            # 建立SSH连接
            ssh_client = self.connect_to_target(target)
            if not ssh_client:
                raise Exception(f"无法连接到 {target['name']}")
            
            # 执行预部署脚本
            self.logger.info(f"执行预部署脚本到 {target['name']}")
            for script in target.get('pre_deploy_scripts', []):
                step_result = self.execute_step(ssh_client, script)
                result['steps'].append(step_result)
                
                if not step_result['success']:
                    ssh_client.close()
                    result['error'] = f"预部署脚本失败: {script}"
                    return result
            
            # 执行部署流水线
            pipeline = self.config['pipeline']
            
            # 备份当前版本
            if self.config['deployment'].get('backup_enabled', True):
                backup_step = self.create_backup_step(artifact)
                backup_result = self.execute_step(ssh_client, backup_step)
                result['steps'].append(backup_result)
            
            # 执行部署步骤
            for step_config in pipeline.get('deploy', []):
                step = DeploymentStep(
                    name=step_config.get('name', 'deploy_step'),
                    type=step_config['type'],
                    command=step_config.get('cmd'),
                    source=step_config.get('src'),
                    target=step_config.get('dest')
                )
                
                step_result = self.execute_step(ssh_client, step)
                result['steps'].append(step_result)
                
                if not step_result['success']:
                    ssh_client.close()
                    result['error'] = f"部署步骤失败: {step.name}"
                    return result
            
            # 执行后部署脚本
            self.logger.info(f"执行后部署脚本到 {target['name']}")
            for script in target.get('post_deploy_scripts', []):
                step_result = self.execute_step(ssh_client, script)
                result['steps'].append(step_result)
                
                if not step_result['success']:
                    ssh_client.close()
                    result['error'] = f"后部署脚本失败: {script}"
                    return result
            
            # 验证部署
            if self.config['deployment'].get('verification_enabled', True):
                verify_result = self.verify_deployment(ssh_client, artifact)
                result['steps'].append(verify_result)
                
                if not verify_result['success']:
                    ssh_client.close()
                    result['error'] = "部署验证失败"
                    return result
            
            ssh_client.close()
            result['success'] = True
            result['end_time'] = datetime.now().isoformat()
            
        except Exception as e:
            result['error'] = str(e)
            result['end_time'] = datetime.now().isoformat()
        
        return result
    
    def connect_to_target(self, target: Dict) -> Optional[paramiko.SSHClient]:
        """连接到目标服务器"""
        try:
            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            # 支持多种认证方式
            if target.get('key_file'):
                key = paramiko.RSAKey.from_private_key_file(
                    os.path.expanduser(target['key_file'])
                )
                client.connect(
                    target['host'],
                    port=target.get('port', 22),
                    username=target['username'],
                    pkey=key
                )
            elif target.get('password'):
                client.connect(
                    target['host'],
                    port=target.get('port', 22),
                    username=target['username'],
                    password=target['password']
                )
            else:
                # 尝试使用默认SSH配置
                client.connect(target['host'], username=target['username'])
            
            return client
            
        except Exception as e:
            self.logger.error(f"连接失败 {target['name']}: {e}")
            return None
    
    def execute_step(self, ssh_client: paramiko.SSHClient, step) -> Dict:
        """执行部署步骤"""
        step_result = {
            'step': step.name if hasattr(step, 'name') else str(step),
            'type': step.type if hasattr(step, 'type') else 'command',
            'success': False,
            'output': '',
            'start_time': datetime.now().isoformat()
        }
        
        try:
            if step_result['type'] == 'command':
                # 执行命令
                cmd = step.command if hasattr(step, 'command') else step
                stdin, stdout, stderr = ssh_client.exec_command(cmd)
                exit_code = stdout.channel.recv_exit_status()
                output = stdout.read().decode('utf-8', errors='ignore')
                
                step_result['exit_code'] = exit_code
                step_result['output'] = output
                step_result['success'] = (exit_code == 0)
                
                if exit_code != 0:
                    step_result['error'] = stderr.read().decode('utf-8', errors='ignore')
            
            elif step_result['type'] == 'upload':
                # 上传文件
                sftp = ssh_client.open_sftp()
                sftp.put(step.source, step.target)
                sftp.close()
                step_result['success'] = True
            
            elif step_result['type'] == 'download':
                # 下载文件
                sftp = ssh_client.open_sftp()
                sftp.get(step.source, step.target)
                sftp.close()
                step_result['success'] = True
            
            elif step_result['type'] == 'backup':
                # 创建备份
                timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
                backup_cmd = f"cp -r {step.target} {step.target}_backup_{timestamp}"
                stdin, stdout, stderr = ssh_client.exec_command(backup_cmd)
                exit_code = stdout.channel.recv_exit_status()
                step_result['success'] = (exit_code == 0)
        
        except Exception as e:
            step_result['error'] = str(e)
        
        step_result['end_time'] = datetime.now().isoformat()
        return step_result
    
    def create_backup_step(self, artifact: Dict) -> DeploymentStep:
        """创建备份步骤"""
        return DeploymentStep(
            name="backup_current_version",
            type="command",
            command=f"cp -r {artifact['target_path']} {artifact['target_path']}_backup_$(date +%Y%m%d_%H%M%S)"
        )
    
    def verify_deployment(self, ssh_client: paramiko.SSHClient, artifact: Dict) -> Dict:
        """验证部署结果"""
        verify_steps = [
            DeploymentStep(
                name="check_file_exists",
                type="command",
                command=f"ls -la {artifact['target_path']}"
            ),
            DeploymentStep(
                name="check_service_status",
                type="command",
                command="systemctl is-active nginx || echo 'Service check skipped'"
            ),
            DeploymentStep(
                name="verify_checksum",
                type="command",
                command=f"find {artifact['target_path']} -type f -exec md5sum {{}} \\; | sort"
            )
        ]
        
        verify_result = {
            'name': 'deployment_verification',
            'type': 'verification',
            'success': True,
            'steps': [],
            'start_time': datetime.now().isoformat()
        }
        
        for step in verify_steps:
            step_result = self.execute_step(ssh_client, step)
            verify_result['steps'].append(step_result)
            
            if not step_result['success']:
                verify_result['success'] = False
                verify_result['error'] = f"验证失败: {step.name}"
                break
        
        verify_result['end_time'] = datetime.now().isoformat()
        return verify_result
    
    defrollback_deployment(self, deployment_id: str):
        """回滚部署"""
        deployment = self.deployments[deployment_id]
        self.logger.info(f"开始回滚部署: {deployment_id}")
        
        deployment['status'] = DeploymentStatus.ROLLED_BACK.value
        deployment['rollback_started_at'] = datetime.now().isoformat()
        
        targets = deployment['targets']
        rollback_steps = self.config['pipeline'].get('rollback', [])
        
        for target in targets:
            try:
                ssh_client = self.connect_to_target(target)
                if not ssh_client:
                    continue
                
                # 执行回滚步骤
                for step_config in rollback_steps:
                    step = DeploymentStep(
                        name=step_config.get('name', 'rollback_step'),
                        type=step_config['type'],
                        command=step_config.get('cmd'),
                        source=step_config.get('src'),
                        target=step_config.get('dest')
                    )
                    
                    step_result = self.execute_step(ssh_client, step)
                    if not step_result['success']:
                        self.logger.error(f"回滚失败 {target['name']}: {step.name}")
                        break
                
                ssh_client.close()
                
            except Exception as e:
                self.logger.error(f"回滚异常 {target['name']}: {e}")
        
        deployment['rollback_completed_at'] = datetime.now().isoformat()
        self.save_deployment_state(deployment_id)
        self.logger.info(f"回滚完成: {deployment_id}")
    
    def save_deployment_state(self, deployment_id: str):
        """保存部署状态"""
        state_dir = Path("deployment_states")
        state_dir.mkdir(exist_ok=True)
        
        state_file = state_dir / f"{deployment_id}.json"
        with open(state_file, 'w') as f:
            json.dump(self.deployments[deployment_id], f, indent=2, default=str)
    
    def send_notification(self, deployment: Dict):
        """发送部署通知"""
        if not self.config['deployment'].get('notification', {}).get('enabled', False):
            return
        
        import requests
        
        webhook_url = self.config['deployment']['notification'].get('webhook')
        if not webhook_url:
            return
        
        status = deployment['status']
        artifact = deployment['artifact']
        targets = deployment['targets']
        
        # 构建通知消息
        message = {
            "text": f"部署通知: {artifact['name']} v{artifact['version']}",
            "attachments": [{
                "color": "good" if status == "completed" else "danger",
                "fields": [
                    {"title": "部署ID", "value": deployment.get('id', 'N/A'), "short": True},
                    {"title": "状态", "value": status, "short": True},
                    {"title": "版本", "value": artifact['version'], "short": True},
                    {"title": "目标服务器", "value": str(len(targets)), "short": True},
                    {"title": "开始时间", "value": deployment.get('started_at', 'N/A'), "short": True},
                    {"title": "完成时间", "value": deployment.get('completed_at', 'N/A'), "short": True}
                ]
            }]
        }
        
        try:
            response = requests.post(webhook_url, json=message)
            if response.status_code == 200:
                self.logger.info("通知发送成功")
            else:
                self.logger.warning(f"通知发送失败: {response.status_code}")
        except Exception as e:
            self.logger.error(f"通知发送异常: {e}")

class DeploymentCLI:
    """部署系统命令行界面"""
    
    def __init__(self):
        self.system = DeploymentSystem()
        
    def run(self):
        """运行CLI"""
        import argparse
        
        parser = argparse.ArgumentParser(description="MobaXterm自动化部署系统")
        subparsers = parser.add_subparsers(dest="command", help="可用命令")
        
        # 创建部署命令
        create_parser = subparsers.add_parser("create", help="创建部署任务")
        create_parser.add_argument("--artifact", required=True, help="部署制品路径")
        create_parser.add_argument("--version", required=True, help="版本号")
        create_parser.add_argument("--targets", nargs="+", help="目标服务器列表")
        create_parser.add_argument("--groups", nargs="+", help="目标服务器组")
        create_parser.add_argument("--description", help="部署描述")
        
        # 执行部署命令
        execute_parser = subparsers.add_parser("execute", help="执行部署任务")
        execute_parser.add_argument("deployment_id", help="部署任务ID")
        
        # 查看状态命令
        status_parser = subparsers.add_parser("status", help="查看部署状态")
        status_parser.add_argument("deployment_id", nargs="?", help="部署任务ID")
        
        # 回滚命令
        rollback_parser = subparsers.add_parser("rollback", help="回滚部署")
        rollback_parser.add_argument("deployment_id", help="部署任务ID")
        
        # 列表命令
        subparsers.add_parser("list", help="列出所有部署任务")
        
        # 报告命令
        report_parser = subparsers.add_parser("report", help="生成部署报告")
        report_parser.add_argument("deployment_id", help="部署任务ID")
        report_parser.add_argument("--format", choices=["html", "json", "pdf"], default="html")
        
        args = parser.parse_args()
        
        if not args.command:
            parser.print_help()
            return
        
        if args.command == "create":
            self.create_deployment(args)
        elif args.command == "execute":
            self.execute_deployment(args.deployment_id)
        elif args.command == "status":
            self.show_status(args.deployment_id)
        elif args.command == "rollback":
            self.rollback_deployment(args.deployment_id)
        elif args.command == "list":
            self.list_deployments()
        elif args.command == "report":
            self.generate_report(args.deployment_id, args.format)
    
    def create_deployment(self, args):
        """创建部署任务"""
        artifact = DeploymentArtifact(
            name=os.path.basename(args.artifact),
            version=args.version,
            source_path=args.artifact,
            target_path="/opt/" + os.path.basename(args.artifact).replace('.tar.gz', '')
        )
        artifact.calculate_checksum()
        
        # 选择目标服务器
        targets = []
        config_targets = self.system.config.get('targets', [])
        
        if args.targets:
            for target_name in args.targets:
                target = next((t for t in config_targets if t['name'] == target_name), None)
                if target:
                    targets.append(DeploymentTarget(**target))
        elif args.groups:
            for target in config_targets:
                if any(g in args.groups for g in target.get('groups', [])):
                    targets.append(DeploymentTarget(**target))
        else:
            # 使用所有目标
            for target in config_targets:
                targets.append(DeploymentTarget(**target))
        
        if not targets:
            print("错误: 未找到目标服务器")
            return
        
        deployment_id = self.system.create_deployment(
            artifact=artifact,
            targets=targets,
            description=args.description or ""
        )
        
        print(f"部署任务已创建:")
        print(f"  ID: {deployment_id}")
        print(f"  制品: {artifact.name} v{artifact.version}")
        print(f"  目标服务器: {len(targets)} 台")
        print(f"  使用命令执行: moba_deployment_system.py execute {deployment_id}")
    
    def execute_deployment(self, deployment_id: str):
        """执行部署"""
        if deployment_id not in self.system.deployments:
            print(f"错误: 部署任务 {deployment_id} 不存在")
            return
        
        print(f"开始执行部署: {deployment_id}")
        print("=" * 50)
        
        deployment = self.system.execute_deployment(deployment_id)
        
        print("\n部署完成!")
        print(f"状态: {deployment['status']}")
        print(f"开始时间: {deployment.get('started_at')}")
        print(f"完成时间: {deployment.get('completed_at')}")
        
        # 显示结果统计
        results = deployment.get('results', {})
        success = sum(1 for r in results.values() if r.get('success'))
        total = len(results)
        
        print(f"\n结果统计: {success}/{total} 成功")
        
        for target_name, result in results.items():
            status = "✓" if result.get('success') else "✗"
            print(f"  {status} {target_name}: {result.get('error', '成功')}")
    
    def show_status(self, deployment_id: str = None):
        """显示部署状态"""
        if deployment_id:
            if deployment_id not in self.system.deployments:
                print(f"错误: 部署任务 {deployment_id} 不存在")
                return
            
            deployment = self.system.deployments[deployment_id]
            self.print_deployment_details(deployment)
        else:
            # 显示所有部署
            state_dir = Path("deployment_states")
            if state_dir.exists():
                print(f"{'ID':<10} {'状态':<12} {'制品':<20} {'版本':<10} {'创建时间':<20}")
                print("-" * 80)
                
                for state_file in state_dir.glob("*.json"):
                    with open(state_file, 'r') as f:
                        deployment = json.load(f)
                    
                    print(f"{deployment.get('id', 'N/A')[:8]:<10} "
                          f"{deployment.get('status', 'N/A'):<12} "
                          f"{deployment.get('artifact', {}).get('name', 'N/A')[:18]:<20} "
                          f"{deployment.get('artifact', {}).get('version', 'N/A')[:8]:<10} "
                          f"{deployment.get('created_at', 'N/A')[:19]:<20}")
            else:
                print("暂无部署记录")
    
    def print_deployment_details(self, deployment: Dict):
        """打印部署详情"""
        print(f"部署ID: {deployment.get('id')}")
        print(f"描述: {deployment.get('description', '无')}")
        print(f"状态: {deployment.get('status')}")
        print(f"创建时间: {deployment.get('created_at')}")
        print(f"开始时间: {deployment.get('started_at', '未开始')}")
        print(f"完成时间: {deployment.get('completed_at', '未完成')}")
        
        artifact = deployment.get('artifact', {})
        print(f"\n制品信息:")
        print(f"  名称: {artifact.get('name')}")
        print(f"  版本: {artifact.get('version')}")
        print(f"  路径: {artifact.get('target_path')}")
        print(f"  校验和: {artifact.get('checksum', '未计算')}")
        
        targets = deployment.get('targets', [])
        print(f"\n目标服务器 ({len(targets)} 台):")
        for target in targets:
            print(f"  - {target.get('name')} ({target.get('host')})")
        
        results = deployment.get('results', {})
        if results:
            print(f"\n执行结果:")
            success = sum(1 for r in results.values() if r.get('success'))
            total = len(results)
            print(f"  成功率: {success}/{total} ({success/total*100:.1f}%)")
            
            for target_name, result in results.items():
                status = "成功" if result.get('success') else "失败"
                duration = ""
                if result.get('start_time') and result.get('end_time'):
                    from datetime import datetime
                    start = datetime.fromisoformat(result['start_time'])
                    end = datetime.fromisoformat(result['end_time'])
                    duration = f" ({end - start})"
                
                print(f"  {target_name}: {status}{duration}")
                
                if not result.get('success'):
                    print(f"    错误: {result.get('error', '未知错误')}")
    
    def rollback_deployment(self, deployment_id: str):
        """回滚部署"""
        print(f"确认回滚部署 {deployment_id}? (y/N)")
        confirm = input().lower()
        
        if confirm != 'y':
            print("已取消")
            return
        
        self.system.rollback_deployment(deployment_id)
        print(f"部署 {deployment_id} 已回滚")
    
    def list_deployments(self):
        """列出所有部署"""
        self.show_status()
    
    def generate_report(self, deployment_id: str, format: str = "html"):
        """生成部署报告"""
        if deployment_id not in self.system.deployments:
            print(f"错误: 部署任务 {deployment_id} 不存在")
            return
        
        deployment = self.system.deployments[deployment_id]
        
        if format == "html":
            self.generate_html_report(deployment)
        elif format == "json":
            self.generate_json_report(deployment)
        elif format == "pdf":
            self.generate_pdf_report(deployment)
    
    def generate_html_report(self, deployment: Dict):
        """生成HTML报告"""
        from datetime import datetime
        
        report_dir = Path("reports")
        report_dir.mkdir(exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        report_file = report_dir / f"deployment_{deployment['id']}_{timestamp}.html"
        
        # 构建HTML内容
        html = f"""
        <!DOCTYPE html>
        <html lang="zh-CN">
        <head>
            <meta charset="UTF-8">
            <meta name="viewport" content="width=device-width, initial-scale=1.0">
            <title>部署报告 - {deployment['id']}</title>
            <style>
                body {{ font-family: 'Microsoft YaHei', Arial, sans-serif; margin: 0; padding: 20px; background: #f5f5f5; }}
                .container {{ max-width: 1200px; margin: 0 auto; background: white; padding: 30px; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }}
                h1 {{ color: #333; border-bottom: 3px solid #4CAF50; padding-bottom: 10px; }}
                h2 {{ color: #555; margin-top: 30px; }}
                .summary {{ background: #e8f5e9; padding: 20px; border-radius: 5px; margin: 20px 0; }}
                .status-success {{ color: #4CAF50; font-weight: bold; }}
                .status-failed {{ color: #f44336; font-weight: bold; }}
                .status-pending {{ color: #ff9800; font-weight: bold; }}
                .table {{ width: 100%; border-collapse: collapse; margin: 20px 0; }}
                .table th, .table td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }}
                .table th {{ background: #4CAF50; color: white; }}
                .table tr:nth-child(even) {{ background: #f9f9f9; }}
                .table tr:hover {{ background: #f1f1f1; }}
                .badge {{ display: inline-block; padding: 3px 8px; border-radius: 12px; font-size: 12px; color: white; }}
                .badge-success {{ background: #4CAF50; }}
                .badge-failed {{ background: #f44336; }}
                .badge-warning {{ background: #ff9800; }}
                .timeline {{ position: relative; margin: 30px 0; }}
                .timeline::before {{ content: ''; position: absolute; left: 50px; top: 0; bottom: 0; width: 2px; background: #4CAF50; }}
                .timeline-item {{ position: relative; margin: 20px 0; padding-left: 80px; }}
                .timeline-time {{ position: absolute; left: 0; width: 80px; text-align: right; padding-right: 15px; font-weight: bold; color: #666; }}
                .timeline-content {{ background: #e8f5e9; padding: 15px; border-radius: 5px; }}
                .chart-container {{ width: 100%; height: 300px; margin: 30px 0; }}
            </style>
            <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
        </head>
        <body>
            <div class="container">
                <h1>🏗️ 部署执行报告</h1>
                
                <div class="summary">
                    <h2>📋 部署概览</h2>
                    <p><strong>部署ID:</strong> {deployment['id']}</p>
                    <p><strong>状态:</strong> <span class="status-{deployment['status']}">{deployment['status'].upper()}</span></p>
                    <p><strong>描述:</strong> {deployment.get('description', '无')}</p>
                    <p><strong>创建时间:</strong> {deployment.get('created_at', 'N/A')}</p>
                    <p><strong>执行时间:</strong> {deployment.get('started_at', 'N/A')} - {deployment.get('completed_at', 'N/A')}</p>
                </div>
                
                <h2>📦 制品信息</h2>
                <table class="table">
                    <tr><th>名称</th><th>版本</th><th>目标路径</th><th>校验和</th></tr>
                    <tr>
                        <td>{deployment.get('artifact', {}).get('name', 'N/A')}</td>
                        <td>{deployment.get('artifact', {}).get('version', 'N/A')}</td>
                        <td>{deployment.get('artifact', {}).get('target_path', 'N/A')}</td>
                        <td><code>{deployment.get('artifact', {}).get('checksum', 'N/A')[:16]}...</code></td>
                    </tr>
                </table>
                
                <h2>🎯 目标服务器</h2>
                <table class="table">
                    <tr><th>名称</th><th>主机</th><th>用户</th><th>组</th><th>状态</th></tr>
        """
        
        # 添加目标服务器行
        results = deployment.get('results', {})
        for target in deployment.get('targets', []):
            target_name = target.get('name')
            result = results.get(target_name, {})
            status_class = "success" if result.get('success') else "failed"
            status_text = "成功" if result.get('success') else "失败"
            
            html += f"""
                    <tr>
                        <td>{target_name}</td>
                        <td>{target.get('host')}</td>
                        <td>{target.get('username', 'N/A')}</td>
                        <td>{', '.join(target.get('groups', []))}</td>
                        <td><span class="badge badge-{status_class}">{status_text}</span></td>
                    </tr>
            """
        
        html += """
                </table>
                
                <h2>📊 执行统计</h2>
                <div class="chart-container">
                    <canvas id="resultChart"></canvas>
                </div>
                
                <h2>⏱️ 时间线</h2>
                <div class="timeline">
        """
        
        # 添加时间线项目
        timeline_events = [
            ("创建", deployment.get('created_at')),
            ("开始", deployment.get('started_at')),
            ("完成", deployment.get('completed_at')),
            ("回滚开始", deployment.get('rollback_started_at')),
            ("回滚完成", deployment.get('rollback_completed_at'))
        ]
        
        for event, time in timeline_events:
            if time:
                html += f"""
                    <div class="timeline-item">
                        <div class="timeline-time">{time[11:19]}</div>
                        <div class="timeline-content">{event}</div>
                    </div>
                """
        
        html += """
                </div>
                
                <h2>📝 详细结果</h2>
        """
        
        # 添加详细结果
        for target_name, result in results.items():
            status_class = "success" if result.get('success') else "failed"
            
            html += f"""
                <div style="margin: 20px 0; padding: 15px; border-left: 4px solid #{'4CAF50' if result.get('success') else 'f44336'}; background: #f9f9f9;">
                    <h3>{target_name} - <span class="badge badge-{status_class}">{"成功" if result.get('success') else "失败"}</span></h3>
                    <p><strong>执行时间:</strong> {result.get('start_time', 'N/A')} - {result.get('end_time', 'N/A')}</p>
            """
            
            if not result.get('success'):
                html += f"<p><strong>错误:</strong> {result.get('error', '未知错误')}</p>"
            
            # 显示步骤
            steps = result.get('steps', [])
            if steps:
                html += "<h4>执行步骤:</h4><ul>"
                for step in steps:
                    step_status = "✓" if step.get('success') else "✗"
                    html += f"<li>{step_status} {step.get('step', 'N/A')}</li>"
                html += "</ul>"
            
            html += "</div>"
        
        # 添加图表脚本
        html += """
            <script>
                // 执行结果统计
                const results = """ + json.dumps(list(results.values())) + """;
                const successCount = results.filter(r => r.success).length;
                const failedCount = results.filter(r => !r.success).length;
                
                const ctx = document.getElementById('resultChart').getContext('2d');
                new Chart(ctx, {
                    type: 'doughnut',
                    data: {
                        labels: ['成功', '失败'],
                        datasets: [{
                            data: [successCount, failedCount],
                            backgroundColor: ['#4CAF50', '#f44336'],
                            borderWidth: 1
                        }]
                    },
                    options: {
                        responsive: true,
                        plugins: {
                            legend: { position: 'top' },
                            title: {
                                display: true,
                                text: '部署结果分布'
                            }
                        }
                    }
                });
                
                // 添加打印功能
                document.addEventListener('keydown', function(e) {
                    if (e.ctrlKey && e.key === 'p') {
                        e.preventDefault();
                        window.print();
                    }
                });
            </script>
        """
        
        html += """
                <div style="margin-top: 40px; padding-top: 20px; border-top: 1px solid #ddd; text-align: center; color: #777; font-size: 12px;">
                    报告生成时间: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """<br>
                    生成工具: MobaXterm自动化部署系统
                </div>
            </div>
        </body>
        </html>
        """
        
        with open(report_file, 'w', encoding='utf-8') as f:
            f.write(html)
        
        print(f"HTML报告已生成: {report_file}")
        
        # 尝试自动打开报告
        try:
            import webbrowser
            webbrowser.open(f"file://{report_file.absolute()}")
        except:
            pass
    
    def generate_json_report(self, deployment: Dict):
        """生成JSON报告"""
        report_dir = Path("reports")
        report_dir.mkdir(exist_ok=True)
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        report_file = report_dir / f"deployment_{deployment['id']}_{timestamp}.json"
        
        with open(report_file, 'w', encoding='utf-8') as f:
            json.dump(deployment, f, indent=2, default=str)
        
        print(f"JSON报告已生成: {report_file}")
    
    def generate_pdf_report(self, deployment: Dict):
        """生成PDF报告"""
        try:
            from reportlab.lib.pagesizes import letter
            from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image
            from reportlab.lib.styles import getSampleStyleSheet
            from reportlab.lib import colors
            from reportlab.lib.units import inch
            
            report_dir = Path("reports")
            report_dir.mkdir(exist_ok=True)
            
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            report_file = report_dir / f"deployment_{deployment['id']}_{timestamp}.pdf"
            
            # 创建PDF文档
            doc = SimpleDocTemplate(str(report_file), pagesize=letter)
            styles = getSampleStyleSheet()
            story = []
            
            # 标题
            title_style = styles['Title']
            title_style.alignment = 1  # 居中
            story.append(Paragraph("部署执行报告", title_style))
            story.append(Spacer(1, 20))
            
            # 部署概览
            story.append(Paragraph("部署概览", styles['Heading1']))
            
            overview_data = [
                ["部署ID:", deployment['id']],
                ["状态:", deployment['status'].upper()],
                ["描述:", deployment.get('description', '无')],
                ["创建时间:", deployment.get('created_at', 'N/A')],
                ["执行时间:", f"{deployment.get('started_at', 'N/A')} - {deployment.get('completed_at', 'N/A')}"]
            ]
            
            overview_table = Table(overview_data, colWidths=[1.5*inch, 4*inch])
            overview_table.setStyle(TableStyle([
                ('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
                ('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
                ('ALIGN', (0, 0), (-1, -1), 'LEFT'),
                ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
                ('FONTSIZE', (0, 0), (-1, -1), 10),
                ('BOTTOMPADDING', (0, 0), (-1, -1), 6),
                ('TOPPADDING', (0, 0), (-1, -1), 6),
                ('GRID', (0, 0), (-1, -1), 1, colors.black)
            ]))
            
            story.append(overview_table)
            story.append(Spacer(1, 20))
            
            # 制品信息
            story.append(Paragraph("制品信息", styles['Heading1']))
            
            artifact = deployment.get('artifact', {})
            artifact_data = [
                ["名称", "版本", "目标路径"],
                [artifact.get('name', 'N/A'), artifact.get('version', 'N/A'), artifact.get('target_path', 'N/A')]
            ]
            
            artifact_table = Table(artifact_data, colWidths=[2*inch, 1.5*inch, 2.5*inch])
            artifact_table.setStyle(TableStyle([
                ('BACKGROUND', (0, 0), (-1, 0), colors.grey),
                ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
                ('ALIGN', (0, 0), (-1, -1), 'CENTER'),
                ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
                ('FONTSIZE', (0, 0), (-1, -1), 10),
                ('BOTTOMPADDING', (0, 0), (-1, -1), 12),
                ('GRID', (0, 0), (-1, -1), 1, colors.black)
            ]))
            
            story.append(artifact_table)
            story.append(Spacer(1, 20))
            
            # 执行结果
            story.append(Paragraph("执行结果", styles['Heading1']))
            
            results = deployment.get('results', {})
            results_data = [["服务器", "状态", "开始时间", "结束时间", "错误信息"]]
            
            for target_name, result in results.items():
                status = "成功" if result.get('success') else "失败"
                results_data.append([
                    target_name,
                    status,
                    result.get('start_time', 'N/A')[:19],
                    result.get('end_time', 'N/A')[:19],
                    result.get('error', '')[:50]
                ])
            
            if len(results_data) > 1:
                results_table = Table(results_data, colWidths=[1.5*inch, 1*inch, 1.5*inch, 1.5*inch, 2*inch])
                results_table.setStyle(TableStyle([
                    ('BACKGROUND', (0, 0), (-1, 0), colors.grey),
                    ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
                    ('ALIGN', (0, 0), (-1, -1), 'CENTER'),
                    ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
                    ('FONTSIZE', (0, 0), (-1, -1), 8),
                    ('BOTTOMPADDING', (0, 0), (-1, -1), 8),
                    ('GRID', (0, 0), (-1, -1), 1, colors.black),
                    ('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.whitesmoke, colors.lightgrey])
                ]))
                
                story.append(results_table)
            else:
                story.append(Paragraph("无执行结果", styles['Normal']))
            
            story.append(Spacer(1, 20))
            
            # 统计信息
            success_count = sum(1 for r in results.values() if r.get('success'))
            total_count = len(results)
            
            stats_text = f"""
            部署统计:
            • 总服务器数: {total_count}
            • 成功: {success_count}
            • 失败: {total_count - success_count}
            • 成功率: {success_count/total_count*100:.1f}% (如果总数大于0)
            """
            
            story.append(Paragraph(stats_text, styles['Normal']))
            
            # 页脚
            story.append(Spacer(1, 40))
            footer_text = f"报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | 生成工具: MobaXterm自动化部署系统"
            story.append(Paragraph(footer_text, styles['Normal']))
            
            # 生成PDF
            doc.build(story)
            print(f"PDF报告已生成: {report_file}")
            
        except ImportError:
            print("错误: 请先安装 reportlab: pip install reportlab")
        except Exception as e:
            print(f"生成PDF报告失败: {e}")

def main():
    """主函数"""
    cli = DeploymentCLI()
    cli.run()

if __name__ == "__main__":
    main()

八、总结与最佳实践

8.1 MobaXterm在Linux运维中的核心价值

  1. 统一的管理平台:集SSH、SFTP、X11、宏录制于一体
  2. 高效的批量操作:支持多会话同时执行命令
  3. 强大的自动化能力:宏录制转换为Python脚本
  4. 便捷的文件管理:拖拽式上传下载,集成SFTP浏览器
  5. 灵活的可扩展性:支持自定义插件开发

8.2 实施路线图

入门阶段
基础配置
会话管理
文件传输
中级阶段
宏录制自动化
批量服务器管理
隧道配置
高级阶段
插件开发
CI/CD集成
监控告警
专家阶段
自定义部署系统
安全加固
性能优化

8.3 关键成功因素

  1. 标准化配置:建立团队统一的MobaXterm配置模板
  2. 知识共享:创建内部Wiki,记录常用脚本和技巧
  3. 安全第一:定期审计SSH密钥,启用双因素认证
  4. 持续优化:定期评估和优化自动化脚本性能
  5. 备份策略:配置文件、会话数据定期备份

8.4 常见问题解决方案

# 问题1: MobaXterm连接速度慢
解决方案:
1. 启用压缩: SSH配置中添加 Compression yes
2. 调整加密算法: 使用 aes128-ctr 替代 aes256-ctr
3. 禁用DNS解析: 在配置中设置 UseDNS no

# 问题2: 文件传输中断
解决方案:
1. 启用断点续传: 在SFTP设置中开启 Resume broken transfers
2. 增大缓冲区: SFTPBufferSize=65536
3. 使用并行传输: ParallelTransfers=4

# 问题3: 会话管理混乱
解决方案:
1. 使用文件夹组织: 按项目/环境分组会话
2. 颜色编码: 为不同环境设置不同标签颜色
3. 自动保存: 启用 Auto save session 功能

# 问题4: 自动化脚本维护困难
解决方案:
1. 模块化设计: 将功能拆分为独立模块
2. 版本控制: 使用Git管理脚本
3. 文档化: 每个脚本都有详细的README
4. 单元测试: 编写测试用例确保功能稳定

通过本文的全面介绍和实战案例,您已经掌握了MobaXterm在Linux运维中的高级使用技巧。从基础的会话管理到复杂的自动化部署系统,这些工具和方法将显著提升您的运维效率和系统可靠性。

记住,工具的价值在于如何有效使用。建议您从实际需求出发,先从简单的自动化开始,逐步构建完整的运维体系。同时,保持对新技术的关注,不断优化和改进现有的工作流程。

推荐后续学习方向

  • 深入研究Ansible自动化运维
  • 学习Kubernetes容器编排
  • 掌握Prometheus监控系统
  • 了解基础设施即代码(IaC)实践

祝您在Linux运维的道路上越走越远!

Logo

这里是“一人公司”的成长家园。我们提供从产品曝光、技术变现到法律财税的全栈内容,并连接云服务、办公空间等稀缺资源,助你专注创造,无忧运营。

更多推荐