一、系统架构设计

传感器为adxl355,需要将传感器采集到的数据定时备份至云端数据库。
二、具体实现步骤
1. 本地数据采集与缓存
工具选择:Python + SQLite (轻量级数据库)
# sensor_data_collector.py
import sqlite3
import time
from your_sensor_library import read_sensor # 替换为实际传感器库
# 创建本地缓存数据库
conn = sqlite3.connect('/path/to/sensor_cache.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS sensor_data
(id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT,
value REAL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
while True:
# 读取传感器数据
sensor_value = read_sensor()
device_id = "SENSOR-001"
# 写入SQLite
c.execute("INSERT INTO sensor_data (device_id, value) VALUES (?, ?)",
(device_id, sensor_value))
conn.commit()
time.sleep(5) # 采集间隔
2. 数据传输协议设计
推荐方案:MQTT协议 (物联网专用)
推荐方案:MQTT协议 (物联网专用)
| 组件 | 说明 |
|---|---|
| Broker | EMQX/HiveMQ (云端部署) |
| Topic | sensor/{device_id}/data |
| QoS | 1 (至少交付一次) |
备选方案:REST API (简单场景)
import requests
import json
API_URL = "https://your-domain.com/api/sensor-data"
API_KEY = "your_secret_key"
def send_to_cloud(data):
headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
response = requests.post(API_URL, data=json.dumps(data), headers=headers)
return response.status_code == 200
3. 云端MySQL数据接收
步骤1:创建安全访问凭证
-- 在云端MySQL执行
CREATE USER 'sensor_ingest'@'%' IDENTIFIED BY 'StrongPassword123!';
GRANT INSERT ON iot_db.sensor_data TO 'sensor_ingest'@'%';
步骤2:设计优化后的数据表
CREATE TABLE sensor_data (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(32) NOT NULL,
value DECIMAL(10,2) NOT NULL,
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
origin_time TIMESTAMP NULL -- 可选:设备端原始时间
) ENGINE=InnoDB PARTITION BY RANGE (MONTH(received_at)) (
PARTITION p0 VALUES LESS THAN (2),
PARTITION p1 VALUES LESS THAN (3),
... -- 按月份分区提升查询性能
);
4. 定时备份任务实现
方案A:使用Linux Cron定时传输
# 每5分钟执行一次传输脚本
*/5 * * * * /usr/bin/python3 /path/to/data_sync.py
方案B:Python定时任务框架 (APScheduler)
# data_sync.py
from apscheduler.schedulers.blocking import BlockingScheduler
import sqlite3
import pymysql
def sync_data():
# 1. 从SQLite读取未同步数据
local_db = sqlite3.connect('/path/to/sensor_cache.db')
unsynced = local_db.execute("SELECT * FROM sensor_data WHERE synced=0").fetchall()
# 2. 批量插入MySQL (提升性能)
cloud_db = pymysql.connect(host="mysql.yourcloud.com", user="sensor_ingest",
password="StrongPassword123!", database="iot_db")
with cloud_db.cursor() as cursor:
sql = "INSERT INTO sensor_data (device_id, value, origin_time) VALUES (%s, %s, %s)"
cursor.executemany(sql, [(row[1], row[2], row[3]) for row in unsynced])
# 3. 标记已同步
ids = [row[0] for row in unsynced]
local_db.execute(f"UPDATE sensor_data SET synced=1 WHERE id IN ({','.join(['?']*len(ids))})", ids)
local_db.commit()
# 4. 清理旧数据 (可选)
local_db.execute("DELETE FROM sensor_data WHERE synced=1 AND timestamp < DATETIME('now','-7 days')")
if __name__ == "__main__":
scheduler = BlockingScheduler()
scheduler.add_job(sync_data, 'interval', minutes=5) # 5分钟同步一次
scheduler.start()
三、关键优化与安全措施
- 数据传输安全
- 启用MySQL SSL连接 (
pymysql使用ssl={'ca': '/path/to/ca.pem'}) - MQTT启用TLS加密 (端口8883)
- API使用HTTPS + JWT认证
- 启用MySQL SSL连接 (
断网容错机制
try:
# 尝试发送数据
except (NetworkError, Timeout) as e:
with open('/tmp/backup_queue.dat', 'ab') as f:
pickle.dump(data, f) # 序列化存储
云端性能优化
-- 创建时间索引
CREATE INDEX idx_received_at ON sensor_data(received_at);
-- 启用MySQL批量插入优化
SET GLOBAL max_allowed_packet=256*1024*1024;
监控告警
# 监控同步日志
tail -f /var/log/sensor_sync.log | grep -E 'ERROR|WARN'
# 监控MySQL写入延迟
SHOW STATUS LIKE 'Innodb_rows_inserted';