MobaXterm在Linux运维中的高级技巧与自动化脚本实战
·
MobaXterm在Linux运维中的高级技巧与自动化脚本实战
一、MobaXterm 核心优势与架构解析
1.1 为什么选择MobaXterm进行Linux运维?
# MobaXterm的核心组件
+---------------------------------------+
| 集成环境 (All-in-One) |
| +-------------------------------+ |
| | SSH客户端 (多标签/多会话) | |
| +-------------------------------+ |
| | X11服务器 (图形应用转发) | |
| +-------------------------------+ |
| | SFTP文件浏览器 (拖拽上传) | |
| +-------------------------------+ |
| | 宏录制器 (自动化脚本) | |
| +-------------------------------+ |
| | 插件系统 (扩展功能) | |
| +-------------------------------+ |
+---------------------------------------+
1.2 环境配置优化
# MobaXterm.ini 关键配置优化
[MobaXterm]
# 会话管理优化
MaxSessions=50
StoreSessionsInRegistry=0
StoreSessionsInFolders=1
# 性能优化
AntiIdleInterval=60
ScrollbackLines=10000
ShareClipboard=1
# SSH配置增强
SSHKeepAlive=1
Compression=1
CompressionLevel=6
# 外观优化
FontName=Consolas
FontSize=11
ColorsScheme=Ubuntu
Transparency=240
二、高级SSH会话管理技巧
2.1 批量服务器管理
#!/bin/bash
# moba_batch_manager.sh
# 批量服务器配置与操作脚本
SERVERS=(
"user1@192.168.1.10:22"
"user2@192.168.1.20:22"
"root@192.168.1.30:2222"
)
# 颜色输出定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 批量执行命令函数
batch_execute() {
local command="$1"
echo -e "${YELLOW}[批量执行] ${command}${NC}"
echo "========================================"
for server in "${SERVERS[@]}"; do
local user=$(echo $server | cut -d'@' -f1)
local host=$(echo $server | cut -d'@' -f2 | cut -d':' -f1)
local port=$(echo $server | cut -d':' -f2)
echo -e "\n${GREEN}>>> 连接到 ${host}${NC}"
# 使用MobaXterm的嵌入式SSH执行
/drives/c/MobaXterm/plugins/MobaSSH.exe \
-user "$user" \
-host "$host" \
-port "${port:-22}" \
-pw "your_password" \
-cmd "$command" \
-timeout 30
if [ $? -eq 0 ]; then
echo -e "${GREEN}✓ ${host} 执行成功${NC}"
else
echo -e "${RED}✗ ${host} 执行失败${NC}"
fi
done
}
# 批量文件分发
batch_scp() {
local local_file="$1"
local remote_path="$2"
for server in "${SERVERS[@]}"; do
local user=$(echo $server | cut -d'@' -f1)
local host=$(echo $server | cut -d'@' -f2 | cut -d':' -f1)
echo -e "\n${YELLOW}分发文件到 ${host}${NC}"
# 使用MobaXterm的SFTP功能
/drives/c/MobaXterm/plugins/MobaSFTP.exe \
put "$local_file" \
"sftp://${user}@${host}:${remote_path}" \
-pw "your_password"
done
}
# 主菜单
main_menu() {
while true; do
clear
echo "=== MobaXterm 批量运维管理 ==="
echo "1. 批量执行命令"
echo "2. 批量分发文件"
echo "3. 批量收集日志"
echo "4. 批量检查服务状态"
echo "5. 批量更新系统"
echo "6. 退出"
echo -n "请选择: "
read choice
case $choice in
1)
echo -n "请输入要执行的命令: "
read cmd
batch_execute "$cmd"
;;
2)
echo -n "请输入本地文件路径: "
read local_file
echo -n "请输入远程路径: "
read remote_path
batch_scp "$local_file" "$remote_path"
;;
3)
batch_execute "tar -czf /tmp/logs_$(date +%Y%m%d).tar.gz /var/log"
batch_scp "/tmp/logs_*.tar.gz" "/backup/logs/"
;;
4)
batch_execute "systemctl list-units --type=service --state=running"
;;
5)
batch_execute "apt update && apt upgrade -y"
;;
6)
exit 0
;;
*)
echo "无效选择"
;;
esac
echo -e "\n按任意键继续..."
read -n1
done
}
# 启动
main_menu
2.2 SSH隧道与端口转发高级应用
# MobaXterm 隧道配置文件 (tunnels.xml)
<tunnels>
<tunnel name="MySQL远程访问" enabled="true">
<type>Local</type>
<listen>127.0.0.1:3307</listen>
<target>db-server:3306</target>
<server>jump-host</server>
<user>admin</user>
<auth>password</auth>
</tunnel>
<tunnel name="Web管理界面" enabled="true">
<type>Dynamic</type>
<listen>1080</listen>
<server>bastion-host</server>
<user>proxy-user</user>
<auth>keyfile:C:\Users\user\.ssh\id_rsa</auth>
</tunnel>
<tunnel name="K8s Dashboard" enabled="true">
<type>Local</type>
<listen>127.0.0.1:8001</listen>
<target>kubernetes-dashboard:443</target>
<server>k8s-master</server>
<user>k8s-admin</user>
<auth>password</auth>
</tunnel>
</tunnels>
# 自动化隧道管理脚本
# manage_tunnels.ps1
param(
[string]$Action = "start",
[string]$TunnelName = "all"
)
$MobaPath = "C:\Program Files (x86)\Mobatek\MobaXterm"
$TunnelConfig = Join-Path $MobaPath "tunnels.xml"
function Start-Tunnel {
param([string]$Name)
# 使用MobaXterm命令行启动隧道
& "$MobaPath\MobaXterm.exe" `
-newinstance `
-hideterm `
-bookmark2 "tunnel://$Name"
Write-Host "隧道 $Name 已启动" -ForegroundColor Green
}
function Stop-Tunnel {
param([string]$Name)
# 查找并终止隧道进程
$processes = Get-Process | Where-Object {
$_.ProcessName -eq "MobaXterm" -and
$_.MainWindowTitle -like "*$Name*"
}
foreach ($proc in $processes) {
$proc.Kill()
Write-Host "隧道 $Name 已停止" -ForegroundColor Yellow
}
}
function Monitor-Tunnels {
# 监控隧道状态
while ($true) {
Clear-Host
Write-Host "=== 隧道状态监控 ===" -ForegroundColor Cyan
Write-Host "时间: $(Get-Date -Format 'HH:mm:ss')"
Write-Host ""
# 检查本地端口监听
$listening = netstat -an | Select-String "LISTENING"
$tunnels = [xml](Get-Content $TunnelConfig)
foreach ($tunnel in $tunnels.tunnels.tunnel) {
$port = $tunnel.listen.Split(':')[1]
$status = if ($listening | Select-String ":$port ") {
"✓ 运行中"
} else {
"✗ 已停止"
}
Write-Host "$($tunnel.name) [$($tunnel.type)]" -NoNewline
Write-Host " - $status" -ForegroundColor $(if ($status -like "*运行中*") {"Green"} else {"Red"})
}
Start-Sleep -Seconds 5
}
}
# 执行操作
switch ($Action) {
"start" {
if ($TunnelName -eq "all") {
$tunnels = [xml](Get-Content $TunnelConfig)
foreach ($tunnel in $tunnels.tunnels.tunnel) {
if ($tunnel.enabled -eq "true") {
Start-Tunnel -Name $tunnel.name
}
}
} else {
Start-Tunnel -Name $TunnelName
}
}
"stop" {
Stop-Tunnel -Name $TunnelName
}
"monitor" {
Monitor-Tunnels
}
default {
Write-Host "用法: manage_tunnels.ps1 [start|stop|monitor] [隧道名|all]" -ForegroundColor Yellow
}
}
三、自动化脚本开发实战
3.1 MobaXterm宏录制与转换
#!/usr/bin/env python3
# moba_macro_converter.py
# 将MobaXterm宏转换为Python脚本
import re
import json
import argparse
from datetime import datetime
class MobaMacroConverter:
def __init__(self, macro_file):
self.macro_file = macro_file
self.commands = []
def parse_macro(self):
"""解析MobaXterm宏文件"""
with open(self.macro_file, 'r', encoding='utf-8') as f:
content = f.read()
# 提取命令序列
pattern = r'<command>(.*?)</command>'
self.commands = re.findall(pattern, content, re.DOTALL)
return self.commands
def convert_to_python(self, output_file=None):
"""转换为Python自动化脚本"""
template = '''#!/usr/bin/env python3
# 自动生成的MobaXterm运维脚本
# 生成时间: {timestamp}
# 源宏文件: {macro_file}
import paramiko
import time
import sys
import os
class MobaAutomation:
def __init__(self, host, username, password=None, key_file=None):
self.host = host
self.username = username
self.password = password
self.key_file = key_file
self.client = None
self.sftp = None
def connect(self):
"""建立SSH连接"""
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
if self.key_file:
key = paramiko.RSAKey.from_private_key_file(self.key_file)
self.client.connect(
self.host,
username=self.username,
pkey=key
)
else:
self.client.connect(
self.host,
username=self.username,
password=self.password
)
self.sftp = self.client.open_sftp()
print(f"[✓] 已连接到 {{self.host}}")
return True
except Exception as e:
print(f"[✗] 连接失败: {{e}}")
return False
def execute_command(self, command, timeout=30):
"""执行命令"""
print(f"[→] 执行: {{command}}")
stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
output = stdout.read().decode('utf-8', errors='ignore')
error = stderr.read().decode('utf-8', errors='ignore')
if exit_code != 0:
print(f"[!] 命令执行失败 (exit={{exit_code}})")
if error:
print(f"错误输出: {{error}}")
else:
print(f"[✓] 命令执行成功")
return {{
'exit_code': exit_code,
'output': output,
'error': error
}}
def upload_file(self, local_path, remote_path):
"""上传文件"""
try:
self.sftp.put(local_path, remote_path)
print(f"[✓] 文件已上传: {{local_path}} -> {{remote_path}}")
return True
except Exception as e:
print(f"[✗] 上传失败: {{e}}")
return False
def download_file(self, remote_path, local_path):
"""下载文件"""
try:
self.sftp.get(remote_path, local_path)
print(f"[✓] 文件已下载: {{remote_path}} -> {{local_path}}")
return True
except Exception as e:
print(f"[✗] 下载失败: {{e}}")
return False
def close(self):
"""关闭连接"""
if self.sftp:
self.sftp.close()
if self.client:
self.client.close()
print("[✓] 连接已关闭")
# 自动化任务
def main():
# 配置连接信息
host = "your-server.com"
username = "your-username"
password = "your-password" # 或使用密钥
# 创建自动化实例
automator = MobaAutomation(host, username, password)
if not automator.connect():
sys.exit(1)
try:
# 自动生成的命令序列
{commands}
finally:
automator.close()
if __name__ == "__main__":
main()
'''
# 转换命令
py_commands = []
for i, cmd in enumerate(self.commands, 1):
cmd = cmd.strip()
if not cmd:
continue
# 处理特殊命令
if cmd.startswith('upload '):
parts = cmd.split(' ', 2)
if len(parts) >= 3:
py_commands.append(f' # 命令 {i}: {cmd}')
py_commands.append(f' automator.upload_file("{parts[1]}", "{parts[2]}")')
elif cmd.startswith('download '):
parts = cmd.split(' ', 2)
if len(parts) >= 3:
py_commands.append(f' # 命令 {i}: {cmd}')
py_commands.append(f' automator.download_file("{parts[1]}", "{parts[2]}")')
else:
# 普通命令
py_commands.append(f' # 命令 {i}: {cmd}')
py_commands.append(f' result = automator.execute_command("{cmd}")')
py_commands.append(f' if result["exit_code"] != 0:')
py_commands.append(f' print("任务失败,停止执行")')
py_commands.append(f' break')
py_commands.append(f' time.sleep(1)')
# 生成最终脚本
final_script = template.format(
timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
macro_file=self.macro_file,
commands='\n'.join(py_commands)
)
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(final_script)
print(f"[✓] Python脚本已生成: {output_file}")
else:
print(final_script)
return final_script
def main():
parser = argparse.ArgumentParser(description='MobaXterm宏转换工具')
parser.add_argument('macro', help='MobaXterm宏文件路径')
parser.add_argument('-o', '--output', help='输出Python文件路径')
parser.add_argument('-p', '--parse-only', action='store_true',
help='仅解析宏文件,不转换')
args = parser.parse_args()
converter = MobaMacroConverter(args.macro)
commands = converter.parse_macro()
if args.parse_only:
print("解析到的命令:")
for i, cmd in enumerate(commands, 1):
print(f"{i:3d}. {cmd}")
else:
converter.convert_to_python(args.output)
if __name__ == "__main__":
main()
3.2 高级运维自动化框架
#!/usr/bin/env python3
# moba_automation_framework.py
# 基于MobaXterm的运维自动化框架
import os
import sys
import json
import yaml
import logging
import paramiko
import threading
import queue
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
class MobaAutomationFramework:
"""MobaXterm自动化框架"""
def __init__(self, config_file='config.yaml'):
self.config = self.load_config(config_file)
self.setup_logging()
self.servers = self.config.get('servers', [])
self.tasks = self.config.get('tasks', {})
def load_config(self, config_file):
"""加载配置文件"""
config_path = Path(config_file)
if not config_path.exists():
# 创建默认配置
default_config = {
'servers': [
{
'name': 'web-server-1',
'host': '192.168.1.100',
'port': 22,
'username': 'admin',
'password': '', # 或使用 key_file
'key_file': '~/.ssh/id_rsa',
'groups': ['web', 'production']
}
],
'tasks': {
'system_health_check': {
'description': '系统健康检查',
'commands': [
'uptime',
'free -h',
'df -h',
'top -bn1 | head -20'
],
'parallel': True
},
'deploy_application': {
'description': '部署应用',
'steps': [
{'type': 'upload', 'src': 'app.tar.gz', 'dest': '/tmp/'},
{'type': 'command', 'cmd': 'tar -xzf /tmp/app.tar.gz -C /opt/app'},
{'type': 'command', 'cmd': 'systemctl restart app-service'}
]
}
}
}
with open(config_file, 'w') as f:
yaml.dump(default_config, f, default_flow_style=False)
return default_config
with open(config_file, 'r') as f:
return yaml.safe_load(f)
def setup_logging(self):
"""配置日志"""
log_dir = Path('logs')
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f'automation_{datetime.now().strftime("%Y%m%d")}.log'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def execute_on_server(self, server, task):
"""在单个服务器上执行任务"""
results = {'server': server['name'], 'success': False, 'output': ''}
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
if server.get('key_file'):
key = paramiko.RSAKey.from_private_key_file(
os.path.expanduser(server['key_file'])
)
client.connect(
server['host'],
port=server.get('port', 22),
username=server['username'],
pkey=key
)
else:
client.connect(
server['host'],
port=server.get('port', 22),
username=server['username'],
password=server.get('password')
)
self.logger.info(f"开始执行任务在 {server['name']}")
# 执行命令
if 'commands' in task:
for cmd in task['commands']:
stdin, stdout, stderr = client.exec_command(cmd)
exit_code = stdout.channel.recv_exit_status()
output = stdout.read().decode()
results['output'] += f"$ {cmd}\n{output}\n"
if exit_code != 0:
error = stderr.read().decode()
self.logger.error(f"命令失败: {cmd}, 错误: {error}")
results['output'] += f"错误: {error}\n"
break
# 执行步骤
elif 'steps' in task:
for step in task['steps']:
if step['type'] == 'command':
stdin, stdout, stderr = client.exec_command(step['cmd'])
exit_code = stdout.channel.recv_exit_status()
output = stdout.read().decode()
results['output'] += f"$ {step['cmd']}\n{output}\n"
if exit_code != 0:
error = stderr.read().decode()
self.logger.error(f"步骤失败: {step['cmd']}")
results['output'] += f"错误: {error}\n"
break
elif step['type'] == 'upload':
sftp = client.open_sftp()
sftp.put(step['src'], step['dest'])
sftp.close()
self.logger.info(f"文件已上传: {step['src']} -> {step['dest']}")
client.close()
results['success'] = True
self.logger.info(f"任务在 {server['name']} 执行完成")
except Exception as e:
self.logger.error(f"服务器 {server['name']} 执行失败: {e}")
results['output'] = str(e)
return results
def run_task(self, task_name, server_filter=None):
"""运行任务"""
if task_name not in self.tasks:
self.logger.error(f"任务不存在: {task_name}")
return
task = self.tasks[task_name]
self.logger.info(f"开始执行任务: {task_name} - {task.get('description', '')}")
# 筛选服务器
target_servers = self.servers
if server_filter:
if isinstance(server_filter, list):
target_servers = [s for s in self.servers if s['name'] in server_filter]
elif isinstance(server_filter, dict):
# 按组筛选
if 'groups' in server_filter:
target_servers = [
s for s in self.servers
if any(g in s.get('groups', []) for g in server_filter['groups'])
]
results = []
# 并行执行
if task.get('parallel', False):
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_server = {
executor.submit(self.execute_on_server, server, task): server
for server in target_servers
}
for future in as_completed(future_to_server):
server = future_to_server[future]
try:
result = future.result()
results.append(result)
except Exception as e:
self.logger.error(f"执行异常: {e}")
results.append({
'server': server['name'],
'success': False,
'output': str(e)
})
else:
# 串行执行
for server in target_servers:
result = self.execute_on_server(server, task)
results.append(result)
# 生成报告
self.generate_report(task_name, results)
# 统计结果
success_count = sum(1 for r in results if r['success'])
total_count = len(results)
self.logger.info(f"任务完成: {success_count}/{total_count} 服务器成功")
return results
def generate_report(self, task_name, results):
"""生成执行报告"""
report_dir = Path('reports')
report_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = report_dir / f'report_{task_name}_{timestamp}.html'
# 生成HTML报告
html = f'''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>运维自动化报告 - {task_name}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.summary {{ background: #f5f5f5; padding: 15px; border-radius: 5px; margin-bottom: 20px; }}
.success {{ color: green; }}
.failed {{ color: red; }}
.server {{ border: 1px solid #ddd; margin: 10px 0; padding: 10px; border-radius: 3px; }}
pre {{ background: #f8f8f8; padding: 10px; border-radius: 3px; overflow: auto; }}
</style>
</head>
<body>
<h1>运维自动化报告</h1>
<div class="summary">
<h2>任务: {task_name}</h2>
<p>执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<p>服务器总数: {len(results)}</p>
<p>成功: <span class="success">{sum(1 for r in results if r['success'])}</span></p>
<p>失败: <span class="failed">{sum(1 for r in results if not r['success'])}</span></p>
</div>
'''
for result in results:
status_class = 'success' if result['success'] else 'failed'
status_text = '成功' if result['success'] else '失败'
html += f'''
<div class="server">
<h3>服务器: {result['server']} - <span class="{status_class}">{status_text}</span></h3>
<pre>{result.get('output', '无输出')}</pre>
</div>
'''
html += '''
</body>
</html>
'''
with open(report_file, 'w', encoding='utf-8') as f:
f.write(html)
self.logger.info(f"报告已生成: {report_file}")
return report_file
def main():
"""CLI入口点"""
import argparse
parser = argparse.ArgumentParser(description='MobaXterm自动化运维框架')
parser.add_argument('task', help='要执行的任务名称')
parser.add_argument('-c', '--config', default='config.yaml', help='配置文件路径')
parser.add_argument('-s', '--servers', nargs='+', help='指定服务器列表')
parser.add_argument('-g', '--groups', nargs='+', help='按服务器组筛选')
args = parser.parse_args()
# 初始化框架
framework = MobaAutomationFramework(args.config)
# 构建服务器筛选条件
server_filter = None
if args.servers:
server_filter = args.servers
elif args.groups:
server_filter = {'groups': args.groups}
# 执行任务
framework.run_task(args.task, server_filter)
if __name__ == "__main__":
main()
四、集成开发与监控
4.1 与CI/CD流水线集成
# .gitlab-ci.yml 示例
stages:
- deploy
- verify
- rollback
variables:
MOBA_CONFIG: "production_servers.yaml"
deploy_production:
stage: deploy
script:
- echo "开始部署到生产环境"
- python moba_automation_framework.py deploy_application -c $MOBA_CONFIG -g production
only:
- master
artifacts:
paths:
- reports/*.html
health_check:
stage: verify
script:
- echo "执行健康检查"
- python moba_automation_framework.py system_health_check -c $MOBA_CONFIG -g production
dependencies:
- deploy_production
artifacts:
paths:
- reports/*.html
rollback_if_failed:
stage: rollback
script:
- echo "检测到部署失败,执行回滚"
- python moba_automation_framework.py rollback_application -c $MOBA_CONFIG -g production
when: on_failure
4.2 实时监控看板
#!/usr/bin/env python3
# moba_monitor_dashboard.py
# 实时监控看板
from flask import Flask, render_template, jsonify
import psutil
import json
from datetime import datetime
import threading
import time
from moba_automation_framework import MobaAutomationFramework
app = Flask(__name__)
framework = MobaAutomationFramework()
class ServerMonitor:
def __init__(self, update_interval=60):
self.update_interval = update_interval
self.monitor_data = {}
self.running = False
def start_monitoring(self):
"""启动监控线程"""
self.running = True
monitor_thread = threading.Thread(target=self._monitor_loop)
monitor_thread.daemon = True
monitor_thread.start()
def _monitor_loop(self):
"""监控循环"""
while self.running:
try:
self.update_monitor_data()
time.sleep(self.update_interval)
except Exception as e:
print(f"监控出错: {e}")
def update_monitor_data(self):
"""更新监控数据"""
for server in framework.servers:
try:
# 连接到服务器获取状态
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if server.get('key_file'):
key = paramiko.RSAKey.from_private_key_file(server['key_file'])
client.connect(
server['host'],
username=server['username'],
pkey=key
)
else:
client.connect(
server['host'],
username=server['username'],
password=server.get('password')
)
# 获取系统状态
commands = {
'cpu': "top -bn1 | grep 'Cpu(s)'",
'memory': "free -m | grep Mem",
'disk': "df -h / | tail -1",
'uptime': "uptime",
'connections': "netstat -an | grep ESTABLISHED | wc -l"
}
server_data = {'timestamp': datetime.now().isoformat()}
for key, cmd in commands.items():
stdin, stdout, stderr = client.exec_command(cmd)
output = stdout.read().decode().strip()
server_data[key] = output
client.close()
# 计算健康分数
server_data['health_score'] = self.calculate_health_score(server_data)
self.monitor_data[server['name']] = server_data
except Exception as e:
self.monitor_data[server['name']] = {
'error': str(e),
'health_score': 0,
'timestamp': datetime.now().isoformat()
}
def calculate_health_score(self, data):
"""计算健康分数"""
score = 100
# 分析CPU使用率
if 'cpu' in data:
try:
cpu_str = data['cpu']
cpu_usage = float(cpu_str.split(',')[0].split('%')[0].strip())
if cpu_usage > 80:
score -= 20
elif cpu_usage > 95:
score -= 50
except:
pass
# 分析内存使用率
if 'memory' in data:
try:
mem_parts = data['memory'].split()
total = int(mem_parts[1])
used = int(mem_parts[2])
mem_usage = (used / total) * 100
if mem_usage > 80:
score -= 20
elif mem_usage > 95:
score -= 50
except:
pass
return max(0, score)
monitor = ServerMonitor()
monitor.start_monitoring()
@app.route('/')
def dashboard():
"""主监控页面"""
return render_template('dashboard.html')
@app.route('/api/monitor')
def get_monitor_data():
"""获取监控数据API"""
return jsonify(monitor.monitor_data)
@app.route('/api/servers')
def get_servers():
"""获取服务器列表"""
servers_info = []
for server in framework.servers:
server_info = {
'name': server['name'],
'host': server['host'],
'groups': server.get('groups', []),
'last_status': monitor.monitor_data.get(server['name'], {})
}
servers_info.append(server_info)
return jsonify(servers_info)
@app.route('/api/task/run/<task_name>')
def run_task(task_name):
"""运行任务API"""
try:
results = framework.run_task(task_name)
return jsonify({
'success': True,
'results': results
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
五、安全与最佳实践
5.1 安全配置指南
# security_hardening.sh
#!/bin/bash
# MobaXterm安全加固脚本
echo "开始MobaXterm安全加固..."
# 1. 配置文件权限加固
chmod 600 ~/MobaXterm.ini
chmod 700 ~/MobaXterm/sessions/
# 2. 会话加密
echo "启用会话加密..."
cat > ~/MobaXterm/sec_config.ini << EOF
[Security]
EncryptPasswords=1
EncryptionKey=$(openssl rand -base64 32)
SessionTimeout=300
MaxFailedAttempts=3
EOF
# 3. SSH密钥管理
echo "设置SSH密钥..."
mkdir -p ~/.ssh
chmod 700 ~/.ssh
# 生成新的SSH密钥对
ssh-keygen -t ed25519 -f ~/.ssh/moba_key -N "" -q
# 4. 配置SSH Agent转发限制
cat > ~/.ssh/config << EOF
Host *
ForwardAgent no
AddKeysToAgent yes
UseKeychain yes
ControlMaster auto
ControlPath ~/.ssh/%r@%h:%p
ControlPersist 600
# 生产服务器严格限制
Host production-*
PasswordAuthentication no
PubkeyAuthentication yes
PermitRootLogin no
MaxAuthTries 2
EOF
# 5. 设置MobaXterm自动锁定
cat >> ~/MobaXterm.ini << EOF
[Security]
AutoLock=1
LockTimeout=300
RequirePasswordOnResume=1
EOF
# 6. 审计日志配置
echo "启用审计日志..."
mkdir -p ~/MobaXterm/logs/audit
cat > ~/MobaXterm/audit.conf << EOF
[Audit]
EnableCommandLogging=1
EnableSessionLogging=1
LogDirectory=~/MobaXterm/logs/audit
RetentionDays=保留天数=90
日志轮转大小=100M
敏感命令告警=1
EOF
# 7. 配置防火墙规则
echo "配置Windows防火墙..."
netsh advfirewall firewall add rule name="MobaXterm SSH" dir=in action=allow protocol=TCP localport=22 enable=yes profile=any
# 8. 禁用危险功能
cat >> ~/MobaXterm.ini << EOF
[Restrictions]
DisableTunnelCreation=0
DisableFileTransfer=0
DisablePluginInstallation=1
AllowUnsafeConnections=0
EOF
# 9. 备份配置文件
echo "备份配置文件..."
backup_dir="~/MobaXterm/backup/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$backup_dir"
cp ~/MobaXterm.ini "$backup_dir/"
cp -r ~/MobaXterm/sessions/ "$backup_dir/" 2>/dev/null || true
# 10. 创建安全报告
cat > "安全加固报告.txt" << EOF
=== MobaXterm 安全加固报告 ===
完成时间: $(date)
加固项目:
1. 配置文件权限设置完成
2. 会话加密已启用
3. SSH密钥对已生成: ~/.ssh/moba_key
4. SSH配置已更新
5. 自动锁定功能已启用
6. 审计日志已配置
7. 防火墙规则已添加
8. 危险功能已限制
9. 配置文件已备份到: $backup_dir
安全建议:
- 定期更改SSH密钥
- 查看审计日志: ~/MobaXterm/logs/audit/
- 启用双因素认证
- 定期更新MobaXterm版本
EOF
echo "安全加固完成!"
5.2 性能优化配置
# performance_optimization.ps1
# MobaXterm性能优化脚本
param(
[switch]$Apply,
[switch]$Revert,
[switch]$Test
)
$MobaPath = "$env:USERPROFILE\Documents\MobaXterm"
$IniFile = "$MobaPath\MobaXterm.ini"
$BackupFile = "$MobaPath\MobaXterm.ini.backup"
function Optimize-Performance {
Write-Host "开始优化MobaXterm性能..." -ForegroundColor Cyan
# 备份原配置
if (Test-Path $IniFile) {
Copy-Item $IniFile $BackupFile -Force
Write-Host "配置文件已备份到: $BackupFile" -ForegroundColor Green
}
# 性能优化配置
$optimizations = @"
[MobaXterm]
; 网络优化
TCPKeepAlive=1
CompressionLevel=9
UseDNS=0
AutoReconnect=1
ReconnectDelay=5
; 内存和CPU优化
MaxScrollbackLines=5000
UseBackgroundThread=1
AntialiasedText=1
DisableAnimation=1
; 渲染优化
DoubleBuffering=1
UseDirectWrite=1
FontQuality=CLEARTYPE_NATURAL_QUALITY
; 会话管理优化
SessionCacheSize=50
EnableSessionCompression=1
AutoSaveSession=1
SaveSessionInterval=300
; 文件传输优化
SFTPBufferSize=65536
ParallelTransfers=4
TransferRetryCount=3
ResumeBrokenTransfers=1
; 显示优化
DisableBanner=1
HideMenuBar=0
UseTabs=1
TabColorMode=1
; 终端性能
TypeAhead=1
DisableAltF4=0
EnableX11Forwarding=1
X11UseLocalhost=1
"@
# 应用优化配置
Add-Content -Path $IniFile -Value $optimizations -Encoding UTF8
Write-Host "性能优化配置已应用" -ForegroundColor Green
# 创建性能测试脚本
$testScript = @"
# MobaXterm性能测试
echo "=== 性能测试开始 ==="
echo "测试时间: $(Get-Date)"
# 测试网络连接速度
echo "`n1. 网络连接测试:"
Test-NetConnection -ComputerName github.com -Port 443 | Select-Object TcpTestSucceeded, Latency
# 测试SSH连接
echo "`n2. SSH连接测试:"
Measure-Command { ssh localhost "echo 'SSH连接测试成功'" } | Select-Object TotalSeconds
# 测试文件传输速度
echo "`n3. 文件传输测试:"
$testFile = [System.IO.Path]::GetTempFileName()
1..1000 | ForEach-Object { "测试数据 $_" } | Set-Content $testFile
Measure-Command { scp $testFile localhost:/tmp/ } | Select-Object TotalSeconds
Remove-Item $testFile
# 测试X11转发性能
echo "`n4. X11转发测试:"
if (Test-Path "C:\Windows\System32\calc.exe") {
Measure-Command {
ssh -X localhost "xcalc &"
Start-Sleep 2
Get-Process xcalc* | Stop-Process
} | Select-Object TotalSeconds
}
echo "`n=== 性能测试完成 ==="
"@
Set-Content -Path "$MobaPath\performance_test.ps1" -Value $testScript -Encoding UTF8
Write-Host "性能测试脚本已创建: $MobaPath\performance_test.ps1" -ForegroundColor Green
}
function Test-Performance {
Write-Host "运行性能测试..." -ForegroundColor Cyan
& "$MobaPath\performance_test.ps1"
# 基准对比
$benchmarks = @{
"SSH连接延迟" = "应 < 0.5秒"
"文件传输速度" = "应 > 1 MB/s"
"X11启动时间" = "应 < 3秒"
"内存使用" = "应 < 100 MB"
}
Write-Host "`n=== 性能基准 ===" -ForegroundColor Yellow
$benchmarks.GetEnumerator() | ForEach-Object {
Write-Host "$($_.Key): $($_.Value)" -ForegroundColor White
}
}
function Revert-Changes {
if (Test-Path $BackupFile) {
Write-Host "恢复原始配置..." -ForegroundColor Yellow
Copy-Item $BackupFile $IniFile -Force
Write-Host "配置已恢复" -ForegroundColor Green
} else {
Write-Host "未找到备份文件" -ForegroundColor Red
}
}
# 主逻辑
if ($Apply) {
Optimize-Performance
Test-Performance
} elseif ($Revert) {
Revert-Changes
} elseif ($Test) {
Test-Performance
} else {
Write-Host "使用方法:" -ForegroundColor Yellow
Write-Host " .\performance_optimization.ps1 -Apply # 应用优化并测试"
Write-Host " .\performance_optimization.ps1 -Revert # 恢复原始配置"
Write-Host " .\performance_optimization.ps1 -Test # 仅运行性能测试"
}
六、高级插件开发
6.1 自定义插件开发指南
#!/usr/bin/env python3
# moba_custom_plugin.py
# MobaXterm自定义插件开发
"""
MobaXterm插件结构:
MobaXterm/
├── plugins/
│ ├── MyPlugin/
│ │ ├── plugin.ini # 插件配置文件
│ │ ├── main.py # 主程序
│ │ ├── ui.xml # 用户界面定义
│ │ └── resources/ # 资源文件
│ └── ThirdPartyPlugins/ # 第三方插件目录
"""
import configparser
import json
import os
import sys
import tkinter as tk
from tkinter import ttk, messagebox
from pathlib import Path
class MobaPlugin:
"""MobaXterm插件基类"""
def __init__(self, plugin_path):
self.plugin_path = Path(plugin_path)
self.config = self.load_config()
self.ui_elements = {}
def load_config(self):
"""加载插件配置"""
config_file = self.plugin_path / "plugin.ini"
config = configparser.ConfigParser()
config.read(config_file)
return config
def create_gui(self):
"""创建插件界面"""
self.root = tk.Tk()
self.root.title(self.config.get('Plugin', 'Name', fallback='MobaXterm插件'))
# 设置窗口大小和位置
width = self.config.getint('UI', 'Width', fallback=600)
height = self.config.getint('UI', 'Height', fallback=400)
self.root.geometry(f"{width}x{height}")
# 创建主框架
self.create_main_frame()
# 绑定事件
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
return self.root
def create_main_frame(self):
"""创建主框架"""
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 标题
title = ttk.Label(
main_frame,
text=self.config.get('Plugin', 'Name'),
font=("Arial", 16, "bold")
)
title.grid(row=0, column=0, columnspan=2, pady=(0, 20))
# 服务器列表
self.create_server_list(main_frame)
# 操作按钮
self.create_action_buttons(main_frame)
# 日志输出
self.create_log_output(main_frame)
# 配置网格权重
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=1)
main_frame.rowconfigure(3, weight=1)
def create_server_list(self, parent):
"""创建服务器列表"""
ttk.Label(parent, text="服务器列表:").grid(row=1, column=0, sticky=tk.W, pady=(0, 5))
# 服务器列表框
frame = ttk.Frame(parent)
frame.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
# 服务器树形列表
self.server_tree = ttk.Treeview(frame, columns=('status', 'name', 'host'), show='tree headings')
self.server_tree.heading('#0', text='组')
self.server_tree.heading('status', text='状态')
self.server_tree.heading('name', text='名称')
self.server_tree.heading('host', text='地址')
# 设置列宽
self.server_tree.column('#0', width=100)
self.server_tree.column('status', width=60)
self.server_tree.column('name', width=120)
self.server_tree.column('host', width=150)
# 滚动条
scrollbar = ttk.Scrollbar(frame, orient=tk.VERTICAL, command=self.server_tree.yview)
self.server_tree.configure(yscrollcommand=scrollbar.set)
self.server_tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
frame.columnconfigure(0, weight=1)
frame.rowconfigure(0, weight=1)
# 加载服务器数据
self.load_servers()
def load_servers(self):
"""加载服务器数据"""
# 从MobaXterm会话文件加载
sessions_path = Path.home() / "Documents" / "MobaXterm" / "sessions"
if sessions_path.exists():
for session_file in sessions_path.glob("*.mx*"):
try:
with open(session_file, 'r') as f:
content = f.read()
# 解析会话文件
name = session_file.stem
host = self.extract_host(content)
group = self.extract_group(content)
# 添加到树形列表
if group not in self.server_tree.get_children():
self.server_tree.insert('', 'end', group, text=group)
self.server_tree.insert(
group, 'end',
values=('●', name, host)
)
except:
continue
def extract_host(self, content):
"""从会话内容提取主机信息"""
import re
match = re.search(r'Host=([^\n]+)', content)
return match.group(1) if match else "N/A"
def extract_group(self, content):
"""从会话内容提取组信息"""
import re
match = re.search(r'Group=([^\n]+)', content)
return match.group(1) if match else "默认组"
def create_action_buttons(self, parent):
"""创建操作按钮"""
button_frame = ttk.Frame(parent)
button_frame.grid(row=3, column=0, columnspan=2, pady=(10, 0))
actions = [
("连接", self.connect_server),
("断开", self.disconnect_server),
("监控", self.monitor_server),
("执行命令", self.execute_command),
("传输文件", self.transfer_file),
("生成报告", self.generate_report)
]
for i, (text, command) in enumerate(actions):
btn = ttk.Button(button_frame, text=text, command=command)
btn.grid(row=0, column=i, padx=2)
def create_log_output(self, parent):
"""创建日志输出区域"""
ttk.Label(parent, text="操作日志:").grid(row=4, column=0, sticky=tk.W, pady=(10, 5))
log_frame = ttk.Frame(parent)
log_frame.grid(row=5, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
# 日志文本框
self.log_text = tk.Text(log_frame, height=10, wrap=tk.WORD)
log_scrollbar = ttk.Scrollbar(log_frame, orient=tk.VERTICAL, command=self.log_text.yview)
self.log_text.configure(yscrollcommand=log_scrollbar.set)
self.log_text.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
log_scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
log_frame.columnconfigure(0, weight=1)
log_frame.rowconfigure(0, weight=1)
def log_message(self, message, level="INFO"):
"""记录日志消息"""
import datetime
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
formatted = f"[{timestamp}] [{level}] {message}\n"
self.log_text.insert(tk.END, formatted)
self.log_text.see(tk.END)
# 颜色编码
if level == "ERROR":
self.log_text.tag_add("error", "end-2l", "end-1l")
self.log_text.tag_config("error", foreground="red")
elif level == "WARNING":
self.log_text.tag_add("warning", "end-2l", "end-1l")
self.log_text.tag_config("warning", foreground="orange")
def connect_server(self):
"""连接选中的服务器"""
selection = self.server_tree.selection()
if not selection:
messagebox.showwarning("警告", "请先选择一个服务器")
return
item = selection[0]
values = self.server_tree.item(item, 'values')
if values:
server_name = values[1]
self.log_message(f"正在连接到服务器: {server_name}")
# 实际连接逻辑...
def disconnect_server(self):
"""断开服务器连接"""
self.log_message("断开服务器连接", "INFO")
def monitor_server(self):
"""监控服务器状态"""
self.log_message("开始监控服务器", "INFO")
def execute_command(self):
"""执行远程命令"""
# 创建命令输入对话框
dialog = tk.Toplevel(self.root)
dialog.title("执行命令")
dialog.geometry("400x200")
ttk.Label(dialog, text="输入要执行的命令:").pack(pady=10)
cmd_entry = ttk.Entry(dialog, width=50)
cmd_entry.pack(pady=5)
def execute():
command = cmd_entry.get()
if command:
self.log_message(f"执行命令: {command}")
# 执行远程命令逻辑...
dialog.destroy()
ttk.Button(dialog, text="执行", command=execute).pack(pady=10)
def transfer_file(self):
"""传输文件"""
self.log_message("文件传输功能", "INFO")
def generate_report(self):
"""生成报告"""
import webbrowser
self.log_message("生成运维报告", "INFO")
# 创建示例报告
report_html = """
<html>
<head><title>运维报告</title></head>
<body>
<h1>MobaXterm插件运维报告</h1>
<p>生成时间: 2024</p>
</body>
</html>
"""
report_file = self.plugin_path / "report.html"
with open(report_file, 'w') as f:
f.write(report_html)
webbrowser.open(f"file://{report_file}")
def on_close(self):
"""关闭插件窗口"""
if messagebox.askokcancel("退出", "确定要退出插件吗?"):
self.root.destroy()
def run(self):
"""运行插件"""
self.create_gui()
self.root.mainloop()
# 插件配置文件模板
def create_plugin_template(plugin_name):
"""创建插件模板"""
plugin_dir = Path.home() / "Documents" / "MobaXterm" / "plugins" / plugin_name
plugin_dir.mkdir(parents=True, exist_ok=True)
# 创建plugin.ini
plugin_ini = f"""[Plugin]
Name = {plugin_name}
Version = 1.0.0
Author = Your Name
Description = 这是一个MobaXterm插件示例
Icon = icon.ico
[UI]
Width = 800
Height = 600
[Settings]
AutoStart = 0
RefreshInterval = 60
"""
(plugin_dir / "plugin.ini").write_text(plugin_ini, encoding='utf-8')
# 创建主程序文件
main_py = Path(__file__).read_text()
(plugin_dir / "main.py").write_text(main_py, encoding='utf-8')
# 创建空资源目录
(plugin_dir / "resources").mkdir(exist_ok=True)
print(f"插件模板已创建到: {plugin_dir}")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "create":
plugin_name = sys.argv[2] if len(sys.argv) > 2 else "MyPlugin"
create_plugin_template(plugin_name)
else:
# 运行插件
plugin_path = Path(__file__).parent
plugin = MobaPlugin(plugin_path)
plugin.run()
七、实战案例:自动化部署系统
7.1 完整的自动化部署系统
#!/usr/bin/env python3
# moba_deployment_system.py
# 基于MobaXterm的自动化部署系统
import os
import sys
import json
import yaml
import paramiko
import hashlib
import threading
import queue
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Any
from enum import Enum
from concurrent.futures import ThreadPoolExecutor
class DeploymentStatus(Enum):
PENDING = "pending"
PREPARING = "preparing"
DEPLOYING = "deploying"
VERIFYING = "verifying"
COMPLETED = "completed"
FAILED = "failed"
ROLLED_BACK = "rolled_back"
@dataclass
class DeploymentTarget:
name: str
host: str
port: int = 22
username: str = "root"
groups: List[str] = None
pre_deploy_scripts: List[str] = None
post_deploy_scripts: List[str] = None
def __post_init__(self):
if self.groups is None:
self.groups = []
if self.pre_deploy_scripts is None:
self.pre_deploy_scripts = []
if self.post_deploy_scripts is None:
self.post_deploy_scripts = []
@dataclass
class DeploymentArtifact:
name: str
version: str
source_path: str
target_path: str
checksum: str = None
backup_path: Optional[str] = None
def calculate_checksum(self):
"""计算文件校验和"""
if os.path.exists(self.source_path):
hash_md5 = hashlib.md5()
with open(self.source_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
self.checksum = hash_md5.hexdigest()
return self.checksum
@dataclass
class DeploymentStep:
name: str
type: str # command, upload, download, backup, rollback
command: Optional[str] = None
source: Optional[str] = None
target: Optional[str] = None
timeout: int = 300
retry_count: int = 3
rollback_command: Optional[str] = None
class DeploymentSystem:
"""自动化部署系统"""
def __init__(self, config_path: str = "deployment_config.yaml"):
self.config_path = Path(config_path)
self.config = self.load_config()
self.deployments = {}
self.setup_logging()
def load_config(self) -> Dict:
"""加载部署配置"""
if not self.config_path.exists():
# 创建示例配置
sample_config = {
'deployment': {
'max_parallel': 5,
'timeout': 1800,
'backup_enabled': True,
'backup_retention': 7,
'notification': {
'enabled': True,
'webhook': 'https://hooks.slack.com/services/...'
}
},
'artifacts': [
{
'name': 'webapp',
'source': './dist/webapp.tar.gz',
'target': '/opt/webapp'
}
],
'targets': [
{
'name': 'web-server-1',
'host': '192.168.1.100',
'groups': ['web', 'production']
}
],
'pipeline': {
'pre_deploy': [
{'type': 'command', 'cmd': 'systemctl stop nginx'}
],
'deploy': [
{'type': 'upload', 'src': './dist/webapp.tar.gz', 'dest': '/tmp/'},
{'type': 'command', 'cmd': 'tar -xzf /tmp/webapp.tar.gz -C /opt/webapp'}
],
'post_deploy': [
{'type': 'command', 'cmd': 'systemctl start nginx'},
{'type': 'command', 'cmd': 'systemctl status nginx'}
],
'rollback': [
{'type': 'command', 'cmd': 'systemctl stop nginx'},
{'type': 'command', 'cmd': 'cp -r /opt/webapp_backup /opt/webapp'},
{'type': 'command', 'cmd': 'systemctl start nginx'}
]
}
}
with open(self.config_path, 'w') as f:
yaml.dump(sample_config, f, default_flow_style=False)
return sample_config
with open(self.config_path, 'r') as f:
return yaml.safe_load(f)
def setup_logging(self):
"""配置日志系统"""
log_dir = Path("deployment_logs")
log_dir.mkdir(exist_ok=True)
self.log_file = log_dir / f"deployment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(self.log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def create_deployment(self, artifact: DeploymentArtifact,
targets: List[DeploymentTarget],
description: str = "") -> str:
"""创建部署任务"""
deployment_id = hashlib.md5(
f"{artifact.name}-{artifact.version}-{datetime.now()}".encode()
).hexdigest()[:8]
deployment = {
'id': deployment_id,
'artifact': asdict(artifact),
'targets': [asdict(t) for t in targets],
'description': description,
'status': DeploymentStatus.PENDING.value,
'created_at': datetime.now().isoformat(),
'steps': [],
'results': {}
}
self.deployments[deployment_id] = deployment
self.save_deployment_state(deployment_id)
self.logger.info(f"创建部署任务: {deployment_id} - {artifact.name} v{artifact.version}")
return deployment_id
def execute_deployment(self, deployment_id: str):
"""执行部署任务"""
deployment = self.deployments[deployment_id]
deployment['status'] = DeploymentStatus.DEPLOYING.value
deployment['started_at'] = datetime.now().isoformat()
self.logger.info(f"开始执行部署: {deployment_id}")
targets = deployment['targets']
artifact = deployment['artifact']
# 并行执行部署
max_parallel = self.config['deployment'].get('max_parallel', 5)
with ThreadPoolExecutor(max_workers=max_parallel) as executor:
future_to_target = {}
for target in targets:
future = executor.submit(
self.deploy_to_target,
target,
artifact,
deployment_id
)
future_to_target[future] = target['name']
# 收集结果
for future in as_completed(future_to_target):
target_name = future_to_target[future]
try:
result = future.result()
deployment['results'][target_name] = result
if result['success']:
self.logger.info(f"部署成功到 {target_name}")
else:
self.logger.error(f"部署失败到 {target_name}: {result.get('error')}")
except Exception as e:
self.logger.error(f"部署异常到 {target_name}: {e}")
deployment['results'][target_name] = {
'success': False,
'error': str(e)
}
# 检查整体状态
success_count = sum(1 for r in deployment['results'].values() if r.get('success'))
total_count = len(targets)
if success_count == total_count:
deployment['status'] = DeploymentStatus.COMPLETED.value
self.logger.info(f"部署完成: {success_count}/{total_count} 成功")
else:
deployment['status'] = DeploymentStatus.FAILED.value
self.logger.error(f"部署失败: {success_count}/{total_count} 成功")
# 自动回滚
if self.config['deployment'].get('auto_rollback', True):
self.logger.info("开始自动回滚...")
self.rollback_deployment(deployment_id)
deployment['completed_at'] = datetime.now().isoformat()
self.save_deployment_state(deployment_id)
# 发送通知
self.send_notification(deployment)
return deployment
def deploy_to_target(self, target: Dict, artifact: Dict, deployment_id: str) -> Dict:
"""部署到单个目标"""
result = {
'target': target['name'],
'success': False,
'steps': [],
'start_time': datetime.now().isoformat()
}
try:
# 建立SSH连接
ssh_client = self.connect_to_target(target)
if not ssh_client:
raise Exception(f"无法连接到 {target['name']}")
# 执行预部署脚本
self.logger.info(f"执行预部署脚本到 {target['name']}")
for script in target.get('pre_deploy_scripts', []):
step_result = self.execute_step(ssh_client, script)
result['steps'].append(step_result)
if not step_result['success']:
ssh_client.close()
result['error'] = f"预部署脚本失败: {script}"
return result
# 执行部署流水线
pipeline = self.config['pipeline']
# 备份当前版本
if self.config['deployment'].get('backup_enabled', True):
backup_step = self.create_backup_step(artifact)
backup_result = self.execute_step(ssh_client, backup_step)
result['steps'].append(backup_result)
# 执行部署步骤
for step_config in pipeline.get('deploy', []):
step = DeploymentStep(
name=step_config.get('name', 'deploy_step'),
type=step_config['type'],
command=step_config.get('cmd'),
source=step_config.get('src'),
target=step_config.get('dest')
)
step_result = self.execute_step(ssh_client, step)
result['steps'].append(step_result)
if not step_result['success']:
ssh_client.close()
result['error'] = f"部署步骤失败: {step.name}"
return result
# 执行后部署脚本
self.logger.info(f"执行后部署脚本到 {target['name']}")
for script in target.get('post_deploy_scripts', []):
step_result = self.execute_step(ssh_client, script)
result['steps'].append(step_result)
if not step_result['success']:
ssh_client.close()
result['error'] = f"后部署脚本失败: {script}"
return result
# 验证部署
if self.config['deployment'].get('verification_enabled', True):
verify_result = self.verify_deployment(ssh_client, artifact)
result['steps'].append(verify_result)
if not verify_result['success']:
ssh_client.close()
result['error'] = "部署验证失败"
return result
ssh_client.close()
result['success'] = True
result['end_time'] = datetime.now().isoformat()
except Exception as e:
result['error'] = str(e)
result['end_time'] = datetime.now().isoformat()
return result
def connect_to_target(self, target: Dict) -> Optional[paramiko.SSHClient]:
"""连接到目标服务器"""
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 支持多种认证方式
if target.get('key_file'):
key = paramiko.RSAKey.from_private_key_file(
os.path.expanduser(target['key_file'])
)
client.connect(
target['host'],
port=target.get('port', 22),
username=target['username'],
pkey=key
)
elif target.get('password'):
client.connect(
target['host'],
port=target.get('port', 22),
username=target['username'],
password=target['password']
)
else:
# 尝试使用默认SSH配置
client.connect(target['host'], username=target['username'])
return client
except Exception as e:
self.logger.error(f"连接失败 {target['name']}: {e}")
return None
def execute_step(self, ssh_client: paramiko.SSHClient, step) -> Dict:
"""执行部署步骤"""
step_result = {
'step': step.name if hasattr(step, 'name') else str(step),
'type': step.type if hasattr(step, 'type') else 'command',
'success': False,
'output': '',
'start_time': datetime.now().isoformat()
}
try:
if step_result['type'] == 'command':
# 执行命令
cmd = step.command if hasattr(step, 'command') else step
stdin, stdout, stderr = ssh_client.exec_command(cmd)
exit_code = stdout.channel.recv_exit_status()
output = stdout.read().decode('utf-8', errors='ignore')
step_result['exit_code'] = exit_code
step_result['output'] = output
step_result['success'] = (exit_code == 0)
if exit_code != 0:
step_result['error'] = stderr.read().decode('utf-8', errors='ignore')
elif step_result['type'] == 'upload':
# 上传文件
sftp = ssh_client.open_sftp()
sftp.put(step.source, step.target)
sftp.close()
step_result['success'] = True
elif step_result['type'] == 'download':
# 下载文件
sftp = ssh_client.open_sftp()
sftp.get(step.source, step.target)
sftp.close()
step_result['success'] = True
elif step_result['type'] == 'backup':
# 创建备份
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_cmd = f"cp -r {step.target} {step.target}_backup_{timestamp}"
stdin, stdout, stderr = ssh_client.exec_command(backup_cmd)
exit_code = stdout.channel.recv_exit_status()
step_result['success'] = (exit_code == 0)
except Exception as e:
step_result['error'] = str(e)
step_result['end_time'] = datetime.now().isoformat()
return step_result
def create_backup_step(self, artifact: Dict) -> DeploymentStep:
"""创建备份步骤"""
return DeploymentStep(
name="backup_current_version",
type="command",
command=f"cp -r {artifact['target_path']} {artifact['target_path']}_backup_$(date +%Y%m%d_%H%M%S)"
)
def verify_deployment(self, ssh_client: paramiko.SSHClient, artifact: Dict) -> Dict:
"""验证部署结果"""
verify_steps = [
DeploymentStep(
name="check_file_exists",
type="command",
command=f"ls -la {artifact['target_path']}"
),
DeploymentStep(
name="check_service_status",
type="command",
command="systemctl is-active nginx || echo 'Service check skipped'"
),
DeploymentStep(
name="verify_checksum",
type="command",
command=f"find {artifact['target_path']} -type f -exec md5sum {{}} \\; | sort"
)
]
verify_result = {
'name': 'deployment_verification',
'type': 'verification',
'success': True,
'steps': [],
'start_time': datetime.now().isoformat()
}
for step in verify_steps:
step_result = self.execute_step(ssh_client, step)
verify_result['steps'].append(step_result)
if not step_result['success']:
verify_result['success'] = False
verify_result['error'] = f"验证失败: {step.name}"
break
verify_result['end_time'] = datetime.now().isoformat()
return verify_result
defrollback_deployment(self, deployment_id: str):
"""回滚部署"""
deployment = self.deployments[deployment_id]
self.logger.info(f"开始回滚部署: {deployment_id}")
deployment['status'] = DeploymentStatus.ROLLED_BACK.value
deployment['rollback_started_at'] = datetime.now().isoformat()
targets = deployment['targets']
rollback_steps = self.config['pipeline'].get('rollback', [])
for target in targets:
try:
ssh_client = self.connect_to_target(target)
if not ssh_client:
continue
# 执行回滚步骤
for step_config in rollback_steps:
step = DeploymentStep(
name=step_config.get('name', 'rollback_step'),
type=step_config['type'],
command=step_config.get('cmd'),
source=step_config.get('src'),
target=step_config.get('dest')
)
step_result = self.execute_step(ssh_client, step)
if not step_result['success']:
self.logger.error(f"回滚失败 {target['name']}: {step.name}")
break
ssh_client.close()
except Exception as e:
self.logger.error(f"回滚异常 {target['name']}: {e}")
deployment['rollback_completed_at'] = datetime.now().isoformat()
self.save_deployment_state(deployment_id)
self.logger.info(f"回滚完成: {deployment_id}")
def save_deployment_state(self, deployment_id: str):
"""保存部署状态"""
state_dir = Path("deployment_states")
state_dir.mkdir(exist_ok=True)
state_file = state_dir / f"{deployment_id}.json"
with open(state_file, 'w') as f:
json.dump(self.deployments[deployment_id], f, indent=2, default=str)
def send_notification(self, deployment: Dict):
"""发送部署通知"""
if not self.config['deployment'].get('notification', {}).get('enabled', False):
return
import requests
webhook_url = self.config['deployment']['notification'].get('webhook')
if not webhook_url:
return
status = deployment['status']
artifact = deployment['artifact']
targets = deployment['targets']
# 构建通知消息
message = {
"text": f"部署通知: {artifact['name']} v{artifact['version']}",
"attachments": [{
"color": "good" if status == "completed" else "danger",
"fields": [
{"title": "部署ID", "value": deployment.get('id', 'N/A'), "short": True},
{"title": "状态", "value": status, "short": True},
{"title": "版本", "value": artifact['version'], "short": True},
{"title": "目标服务器", "value": str(len(targets)), "short": True},
{"title": "开始时间", "value": deployment.get('started_at', 'N/A'), "short": True},
{"title": "完成时间", "value": deployment.get('completed_at', 'N/A'), "short": True}
]
}]
}
try:
response = requests.post(webhook_url, json=message)
if response.status_code == 200:
self.logger.info("通知发送成功")
else:
self.logger.warning(f"通知发送失败: {response.status_code}")
except Exception as e:
self.logger.error(f"通知发送异常: {e}")
class DeploymentCLI:
"""部署系统命令行界面"""
def __init__(self):
self.system = DeploymentSystem()
def run(self):
"""运行CLI"""
import argparse
parser = argparse.ArgumentParser(description="MobaXterm自动化部署系统")
subparsers = parser.add_subparsers(dest="command", help="可用命令")
# 创建部署命令
create_parser = subparsers.add_parser("create", help="创建部署任务")
create_parser.add_argument("--artifact", required=True, help="部署制品路径")
create_parser.add_argument("--version", required=True, help="版本号")
create_parser.add_argument("--targets", nargs="+", help="目标服务器列表")
create_parser.add_argument("--groups", nargs="+", help="目标服务器组")
create_parser.add_argument("--description", help="部署描述")
# 执行部署命令
execute_parser = subparsers.add_parser("execute", help="执行部署任务")
execute_parser.add_argument("deployment_id", help="部署任务ID")
# 查看状态命令
status_parser = subparsers.add_parser("status", help="查看部署状态")
status_parser.add_argument("deployment_id", nargs="?", help="部署任务ID")
# 回滚命令
rollback_parser = subparsers.add_parser("rollback", help="回滚部署")
rollback_parser.add_argument("deployment_id", help="部署任务ID")
# 列表命令
subparsers.add_parser("list", help="列出所有部署任务")
# 报告命令
report_parser = subparsers.add_parser("report", help="生成部署报告")
report_parser.add_argument("deployment_id", help="部署任务ID")
report_parser.add_argument("--format", choices=["html", "json", "pdf"], default="html")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
if args.command == "create":
self.create_deployment(args)
elif args.command == "execute":
self.execute_deployment(args.deployment_id)
elif args.command == "status":
self.show_status(args.deployment_id)
elif args.command == "rollback":
self.rollback_deployment(args.deployment_id)
elif args.command == "list":
self.list_deployments()
elif args.command == "report":
self.generate_report(args.deployment_id, args.format)
def create_deployment(self, args):
"""创建部署任务"""
artifact = DeploymentArtifact(
name=os.path.basename(args.artifact),
version=args.version,
source_path=args.artifact,
target_path="/opt/" + os.path.basename(args.artifact).replace('.tar.gz', '')
)
artifact.calculate_checksum()
# 选择目标服务器
targets = []
config_targets = self.system.config.get('targets', [])
if args.targets:
for target_name in args.targets:
target = next((t for t in config_targets if t['name'] == target_name), None)
if target:
targets.append(DeploymentTarget(**target))
elif args.groups:
for target in config_targets:
if any(g in args.groups for g in target.get('groups', [])):
targets.append(DeploymentTarget(**target))
else:
# 使用所有目标
for target in config_targets:
targets.append(DeploymentTarget(**target))
if not targets:
print("错误: 未找到目标服务器")
return
deployment_id = self.system.create_deployment(
artifact=artifact,
targets=targets,
description=args.description or ""
)
print(f"部署任务已创建:")
print(f" ID: {deployment_id}")
print(f" 制品: {artifact.name} v{artifact.version}")
print(f" 目标服务器: {len(targets)} 台")
print(f" 使用命令执行: moba_deployment_system.py execute {deployment_id}")
def execute_deployment(self, deployment_id: str):
"""执行部署"""
if deployment_id not in self.system.deployments:
print(f"错误: 部署任务 {deployment_id} 不存在")
return
print(f"开始执行部署: {deployment_id}")
print("=" * 50)
deployment = self.system.execute_deployment(deployment_id)
print("\n部署完成!")
print(f"状态: {deployment['status']}")
print(f"开始时间: {deployment.get('started_at')}")
print(f"完成时间: {deployment.get('completed_at')}")
# 显示结果统计
results = deployment.get('results', {})
success = sum(1 for r in results.values() if r.get('success'))
total = len(results)
print(f"\n结果统计: {success}/{total} 成功")
for target_name, result in results.items():
status = "✓" if result.get('success') else "✗"
print(f" {status} {target_name}: {result.get('error', '成功')}")
def show_status(self, deployment_id: str = None):
"""显示部署状态"""
if deployment_id:
if deployment_id not in self.system.deployments:
print(f"错误: 部署任务 {deployment_id} 不存在")
return
deployment = self.system.deployments[deployment_id]
self.print_deployment_details(deployment)
else:
# 显示所有部署
state_dir = Path("deployment_states")
if state_dir.exists():
print(f"{'ID':<10} {'状态':<12} {'制品':<20} {'版本':<10} {'创建时间':<20}")
print("-" * 80)
for state_file in state_dir.glob("*.json"):
with open(state_file, 'r') as f:
deployment = json.load(f)
print(f"{deployment.get('id', 'N/A')[:8]:<10} "
f"{deployment.get('status', 'N/A'):<12} "
f"{deployment.get('artifact', {}).get('name', 'N/A')[:18]:<20} "
f"{deployment.get('artifact', {}).get('version', 'N/A')[:8]:<10} "
f"{deployment.get('created_at', 'N/A')[:19]:<20}")
else:
print("暂无部署记录")
def print_deployment_details(self, deployment: Dict):
"""打印部署详情"""
print(f"部署ID: {deployment.get('id')}")
print(f"描述: {deployment.get('description', '无')}")
print(f"状态: {deployment.get('status')}")
print(f"创建时间: {deployment.get('created_at')}")
print(f"开始时间: {deployment.get('started_at', '未开始')}")
print(f"完成时间: {deployment.get('completed_at', '未完成')}")
artifact = deployment.get('artifact', {})
print(f"\n制品信息:")
print(f" 名称: {artifact.get('name')}")
print(f" 版本: {artifact.get('version')}")
print(f" 路径: {artifact.get('target_path')}")
print(f" 校验和: {artifact.get('checksum', '未计算')}")
targets = deployment.get('targets', [])
print(f"\n目标服务器 ({len(targets)} 台):")
for target in targets:
print(f" - {target.get('name')} ({target.get('host')})")
results = deployment.get('results', {})
if results:
print(f"\n执行结果:")
success = sum(1 for r in results.values() if r.get('success'))
total = len(results)
print(f" 成功率: {success}/{total} ({success/total*100:.1f}%)")
for target_name, result in results.items():
status = "成功" if result.get('success') else "失败"
duration = ""
if result.get('start_time') and result.get('end_time'):
from datetime import datetime
start = datetime.fromisoformat(result['start_time'])
end = datetime.fromisoformat(result['end_time'])
duration = f" ({end - start})"
print(f" {target_name}: {status}{duration}")
if not result.get('success'):
print(f" 错误: {result.get('error', '未知错误')}")
def rollback_deployment(self, deployment_id: str):
"""回滚部署"""
print(f"确认回滚部署 {deployment_id}? (y/N)")
confirm = input().lower()
if confirm != 'y':
print("已取消")
return
self.system.rollback_deployment(deployment_id)
print(f"部署 {deployment_id} 已回滚")
def list_deployments(self):
"""列出所有部署"""
self.show_status()
def generate_report(self, deployment_id: str, format: str = "html"):
"""生成部署报告"""
if deployment_id not in self.system.deployments:
print(f"错误: 部署任务 {deployment_id} 不存在")
return
deployment = self.system.deployments[deployment_id]
if format == "html":
self.generate_html_report(deployment)
elif format == "json":
self.generate_json_report(deployment)
elif format == "pdf":
self.generate_pdf_report(deployment)
def generate_html_report(self, deployment: Dict):
"""生成HTML报告"""
from datetime import datetime
report_dir = Path("reports")
report_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = report_dir / f"deployment_{deployment['id']}_{timestamp}.html"
# 构建HTML内容
html = f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>部署报告 - {deployment['id']}</title>
<style>
body {{ font-family: 'Microsoft YaHei', Arial, sans-serif; margin: 0; padding: 20px; background: #f5f5f5; }}
.container {{ max-width: 1200px; margin: 0 auto; background: white; padding: 30px; border-radius: 10px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }}
h1 {{ color: #333; border-bottom: 3px solid #4CAF50; padding-bottom: 10px; }}
h2 {{ color: #555; margin-top: 30px; }}
.summary {{ background: #e8f5e9; padding: 20px; border-radius: 5px; margin: 20px 0; }}
.status-success {{ color: #4CAF50; font-weight: bold; }}
.status-failed {{ color: #f44336; font-weight: bold; }}
.status-pending {{ color: #ff9800; font-weight: bold; }}
.table {{ width: 100%; border-collapse: collapse; margin: 20px 0; }}
.table th, .table td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }}
.table th {{ background: #4CAF50; color: white; }}
.table tr:nth-child(even) {{ background: #f9f9f9; }}
.table tr:hover {{ background: #f1f1f1; }}
.badge {{ display: inline-block; padding: 3px 8px; border-radius: 12px; font-size: 12px; color: white; }}
.badge-success {{ background: #4CAF50; }}
.badge-failed {{ background: #f44336; }}
.badge-warning {{ background: #ff9800; }}
.timeline {{ position: relative; margin: 30px 0; }}
.timeline::before {{ content: ''; position: absolute; left: 50px; top: 0; bottom: 0; width: 2px; background: #4CAF50; }}
.timeline-item {{ position: relative; margin: 20px 0; padding-left: 80px; }}
.timeline-time {{ position: absolute; left: 0; width: 80px; text-align: right; padding-right: 15px; font-weight: bold; color: #666; }}
.timeline-content {{ background: #e8f5e9; padding: 15px; border-radius: 5px; }}
.chart-container {{ width: 100%; height: 300px; margin: 30px 0; }}
</style>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
<div class="container">
<h1>🏗️ 部署执行报告</h1>
<div class="summary">
<h2>📋 部署概览</h2>
<p><strong>部署ID:</strong> {deployment['id']}</p>
<p><strong>状态:</strong> <span class="status-{deployment['status']}">{deployment['status'].upper()}</span></p>
<p><strong>描述:</strong> {deployment.get('description', '无')}</p>
<p><strong>创建时间:</strong> {deployment.get('created_at', 'N/A')}</p>
<p><strong>执行时间:</strong> {deployment.get('started_at', 'N/A')} - {deployment.get('completed_at', 'N/A')}</p>
</div>
<h2>📦 制品信息</h2>
<table class="table">
<tr><th>名称</th><th>版本</th><th>目标路径</th><th>校验和</th></tr>
<tr>
<td>{deployment.get('artifact', {}).get('name', 'N/A')}</td>
<td>{deployment.get('artifact', {}).get('version', 'N/A')}</td>
<td>{deployment.get('artifact', {}).get('target_path', 'N/A')}</td>
<td><code>{deployment.get('artifact', {}).get('checksum', 'N/A')[:16]}...</code></td>
</tr>
</table>
<h2>🎯 目标服务器</h2>
<table class="table">
<tr><th>名称</th><th>主机</th><th>用户</th><th>组</th><th>状态</th></tr>
"""
# 添加目标服务器行
results = deployment.get('results', {})
for target in deployment.get('targets', []):
target_name = target.get('name')
result = results.get(target_name, {})
status_class = "success" if result.get('success') else "failed"
status_text = "成功" if result.get('success') else "失败"
html += f"""
<tr>
<td>{target_name}</td>
<td>{target.get('host')}</td>
<td>{target.get('username', 'N/A')}</td>
<td>{', '.join(target.get('groups', []))}</td>
<td><span class="badge badge-{status_class}">{status_text}</span></td>
</tr>
"""
html += """
</table>
<h2>📊 执行统计</h2>
<div class="chart-container">
<canvas id="resultChart"></canvas>
</div>
<h2>⏱️ 时间线</h2>
<div class="timeline">
"""
# 添加时间线项目
timeline_events = [
("创建", deployment.get('created_at')),
("开始", deployment.get('started_at')),
("完成", deployment.get('completed_at')),
("回滚开始", deployment.get('rollback_started_at')),
("回滚完成", deployment.get('rollback_completed_at'))
]
for event, time in timeline_events:
if time:
html += f"""
<div class="timeline-item">
<div class="timeline-time">{time[11:19]}</div>
<div class="timeline-content">{event}</div>
</div>
"""
html += """
</div>
<h2>📝 详细结果</h2>
"""
# 添加详细结果
for target_name, result in results.items():
status_class = "success" if result.get('success') else "failed"
html += f"""
<div style="margin: 20px 0; padding: 15px; border-left: 4px solid #{'4CAF50' if result.get('success') else 'f44336'}; background: #f9f9f9;">
<h3>{target_name} - <span class="badge badge-{status_class}">{"成功" if result.get('success') else "失败"}</span></h3>
<p><strong>执行时间:</strong> {result.get('start_time', 'N/A')} - {result.get('end_time', 'N/A')}</p>
"""
if not result.get('success'):
html += f"<p><strong>错误:</strong> {result.get('error', '未知错误')}</p>"
# 显示步骤
steps = result.get('steps', [])
if steps:
html += "<h4>执行步骤:</h4><ul>"
for step in steps:
step_status = "✓" if step.get('success') else "✗"
html += f"<li>{step_status} {step.get('step', 'N/A')}</li>"
html += "</ul>"
html += "</div>"
# 添加图表脚本
html += """
<script>
// 执行结果统计
const results = """ + json.dumps(list(results.values())) + """;
const successCount = results.filter(r => r.success).length;
const failedCount = results.filter(r => !r.success).length;
const ctx = document.getElementById('resultChart').getContext('2d');
new Chart(ctx, {
type: 'doughnut',
data: {
labels: ['成功', '失败'],
datasets: [{
data: [successCount, failedCount],
backgroundColor: ['#4CAF50', '#f44336'],
borderWidth: 1
}]
},
options: {
responsive: true,
plugins: {
legend: { position: 'top' },
title: {
display: true,
text: '部署结果分布'
}
}
}
});
// 添加打印功能
document.addEventListener('keydown', function(e) {
if (e.ctrlKey && e.key === 'p') {
e.preventDefault();
window.print();
}
});
</script>
"""
html += """
<div style="margin-top: 40px; padding-top: 20px; border-top: 1px solid #ddd; text-align: center; color: #777; font-size: 12px;">
报告生成时间: """ + datetime.now().strftime('%Y-%m-%d %H:%M:%S') + """<br>
生成工具: MobaXterm自动化部署系统
</div>
</div>
</body>
</html>
"""
with open(report_file, 'w', encoding='utf-8') as f:
f.write(html)
print(f"HTML报告已生成: {report_file}")
# 尝试自动打开报告
try:
import webbrowser
webbrowser.open(f"file://{report_file.absolute()}")
except:
pass
def generate_json_report(self, deployment: Dict):
"""生成JSON报告"""
report_dir = Path("reports")
report_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = report_dir / f"deployment_{deployment['id']}_{timestamp}.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(deployment, f, indent=2, default=str)
print(f"JSON报告已生成: {report_file}")
def generate_pdf_report(self, deployment: Dict):
"""生成PDF报告"""
try:
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib import colors
from reportlab.lib.units import inch
report_dir = Path("reports")
report_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = report_dir / f"deployment_{deployment['id']}_{timestamp}.pdf"
# 创建PDF文档
doc = SimpleDocTemplate(str(report_file), pagesize=letter)
styles = getSampleStyleSheet()
story = []
# 标题
title_style = styles['Title']
title_style.alignment = 1 # 居中
story.append(Paragraph("部署执行报告", title_style))
story.append(Spacer(1, 20))
# 部署概览
story.append(Paragraph("部署概览", styles['Heading1']))
overview_data = [
["部署ID:", deployment['id']],
["状态:", deployment['status'].upper()],
["描述:", deployment.get('description', '无')],
["创建时间:", deployment.get('created_at', 'N/A')],
["执行时间:", f"{deployment.get('started_at', 'N/A')} - {deployment.get('completed_at', 'N/A')}"]
]
overview_table = Table(overview_data, colWidths=[1.5*inch, 4*inch])
overview_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.lightgrey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.black),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('BOTTOMPADDING', (0, 0), (-1, -1), 6),
('TOPPADDING', (0, 0), (-1, -1), 6),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(overview_table)
story.append(Spacer(1, 20))
# 制品信息
story.append(Paragraph("制品信息", styles['Heading1']))
artifact = deployment.get('artifact', {})
artifact_data = [
["名称", "版本", "目标路径"],
[artifact.get('name', 'N/A'), artifact.get('version', 'N/A'), artifact.get('target_path', 'N/A')]
]
artifact_table = Table(artifact_data, colWidths=[2*inch, 1.5*inch, 2.5*inch])
artifact_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 10),
('BOTTOMPADDING', (0, 0), (-1, -1), 12),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(artifact_table)
story.append(Spacer(1, 20))
# 执行结果
story.append(Paragraph("执行结果", styles['Heading1']))
results = deployment.get('results', {})
results_data = [["服务器", "状态", "开始时间", "结束时间", "错误信息"]]
for target_name, result in results.items():
status = "成功" if result.get('success') else "失败"
results_data.append([
target_name,
status,
result.get('start_time', 'N/A')[:19],
result.get('end_time', 'N/A')[:19],
result.get('error', '')[:50]
])
if len(results_data) > 1:
results_table = Table(results_data, colWidths=[1.5*inch, 1*inch, 1.5*inch, 1.5*inch, 2*inch])
results_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, -1), 8),
('BOTTOMPADDING', (0, 0), (-1, -1), 8),
('GRID', (0, 0), (-1, -1), 1, colors.black),
('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.whitesmoke, colors.lightgrey])
]))
story.append(results_table)
else:
story.append(Paragraph("无执行结果", styles['Normal']))
story.append(Spacer(1, 20))
# 统计信息
success_count = sum(1 for r in results.values() if r.get('success'))
total_count = len(results)
stats_text = f"""
部署统计:
• 总服务器数: {total_count}
• 成功: {success_count}
• 失败: {total_count - success_count}
• 成功率: {success_count/total_count*100:.1f}% (如果总数大于0)
"""
story.append(Paragraph(stats_text, styles['Normal']))
# 页脚
story.append(Spacer(1, 40))
footer_text = f"报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | 生成工具: MobaXterm自动化部署系统"
story.append(Paragraph(footer_text, styles['Normal']))
# 生成PDF
doc.build(story)
print(f"PDF报告已生成: {report_file}")
except ImportError:
print("错误: 请先安装 reportlab: pip install reportlab")
except Exception as e:
print(f"生成PDF报告失败: {e}")
def main():
"""主函数"""
cli = DeploymentCLI()
cli.run()
if __name__ == "__main__":
main()
八、总结与最佳实践
8.1 MobaXterm在Linux运维中的核心价值
- 统一的管理平台:集SSH、SFTP、X11、宏录制于一体
- 高效的批量操作:支持多会话同时执行命令
- 强大的自动化能力:宏录制转换为Python脚本
- 便捷的文件管理:拖拽式上传下载,集成SFTP浏览器
- 灵活的可扩展性:支持自定义插件开发
8.2 实施路线图
8.3 关键成功因素
- 标准化配置:建立团队统一的MobaXterm配置模板
- 知识共享:创建内部Wiki,记录常用脚本和技巧
- 安全第一:定期审计SSH密钥,启用双因素认证
- 持续优化:定期评估和优化自动化脚本性能
- 备份策略:配置文件、会话数据定期备份
8.4 常见问题解决方案
# 问题1: MobaXterm连接速度慢
解决方案:
1. 启用压缩: SSH配置中添加 Compression yes
2. 调整加密算法: 使用 aes128-ctr 替代 aes256-ctr
3. 禁用DNS解析: 在配置中设置 UseDNS no
# 问题2: 文件传输中断
解决方案:
1. 启用断点续传: 在SFTP设置中开启 Resume broken transfers
2. 增大缓冲区: SFTPBufferSize=65536
3. 使用并行传输: ParallelTransfers=4
# 问题3: 会话管理混乱
解决方案:
1. 使用文件夹组织: 按项目/环境分组会话
2. 颜色编码: 为不同环境设置不同标签颜色
3. 自动保存: 启用 Auto save session 功能
# 问题4: 自动化脚本维护困难
解决方案:
1. 模块化设计: 将功能拆分为独立模块
2. 版本控制: 使用Git管理脚本
3. 文档化: 每个脚本都有详细的README
4. 单元测试: 编写测试用例确保功能稳定
通过本文的全面介绍和实战案例,您已经掌握了MobaXterm在Linux运维中的高级使用技巧。从基础的会话管理到复杂的自动化部署系统,这些工具和方法将显著提升您的运维效率和系统可靠性。
记住,工具的价值在于如何有效使用。建议您从实际需求出发,先从简单的自动化开始,逐步构建完整的运维体系。同时,保持对新技术的关注,不断优化和改进现有的工作流程。
推荐后续学习方向:
- 深入研究Ansible自动化运维
- 学习Kubernetes容器编排
- 掌握Prometheus监控系统
- 了解基础设施即代码(IaC)实践
祝您在Linux运维的道路上越走越远!
更多推荐



所有评论(0)