一、TestNG连接MySQL数据库
这个示例演示了:
- 1.完整的数据库操作封装
- 2.符合单元测试要求的隔离性实现
- 3.增删改查的原子操作
- 4.通过Assertions进行结果验证
- 5.遵循JDBC最佳实践
实际项目中需要:
- 1.将数据库配置移到.properties文件
- 2.使用连接池(如HikariCP)
- 3.添加异常处理机制
- 4.实现更复杂的事务管理
- 5.添加日志记录系统
代码示例:
import java.sql.*;
import org.testng.annotations.*;
import static org.testng.Assert.*;
public class UserDAOTest {
private Connection connection;
private UserDAO userDAO;
// 数据库配置(实际项目建议使用配置文件)
private static final String JDBC_URL = "jdbc:mysql://localhost:3306/testdb?useSSL=false";
private static final String USERNAME = "root";
private static final String PASSWORD = "yourpassword";
// 测试用的用户数据
private final User testUser = new User(0, "TestUser", 30, "test@email.com");
@BeforeClass
public void initDatabase() throws SQLException {
// 创建连接
connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD);
userDAO = new UserDAO(connection);
// 初始化测试表
try (Statement stmt = connection.createStatement()) {
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS users (" +
"id INT AUTO_INCREMENT PRIMARY KEY, " +
"name VARCHAR(100) NOT NULL, " +
"age INT, " +
"email VARCHAR(100))");
}
}
@BeforeMethod
public void clearData() throws SQLException {
// 每次测试前清空表
try (Statement stmt = connection.createStatement()) {
stmt.executeUpdate("TRUNCATE TABLE users");
}
}
@AfterClass
public void closeConnection() throws SQLException {
if (connection != null && !connection.isClosed()) {
connection.close();
}
}
/*---------------- 测试用例 ----------------*/
@Test(priority = 1)
public void testInsertUser() throws SQLException {
int generatedId = userDAO.insertUser(testUser);
assertTrue(generatedId > 0, "返回的ID应该大于0");
// 验证插入的数据
User insertedUser = userDAO.getUserById(generatedId);
assertEquals(insertedUser.getName(), testUser.getName());
assertEquals(insertedUser.getAge(), testUser.getAge());
}
@Test(priority = 2)
public void testUpdateUser() throws SQLException {
// 先插入测试数据
int id = userDAO.insertUser(testUser);
// 修改用户
User updatedUser = new User(id, "UpdatedUser", 35, "updated@email.com");
userDAO.updateUser(updatedUser);
// 验证修改结果
User result = userDAO.getUserById(id);
assertEquals(result.getName(), updatedUser.getName());
assertEquals(result.getAge(), updatedUser.getAge());
}
@Test(priority = 3)
public void testDeleteUser() throws SQLException {
int id = userDAO.insertUser(testUser);
userDAO.deleteUser(id);
// 验证删除
assertNull(userDAO.getUserById(id), "用户应该已被删除");
}
@Test(priority = 4)
public void testGetAllUsers() throws SQLException {
// 插入多条数据
userDAO.insertUser(new User(0, "User1", 25, "u1@email.com"));
userDAO.insertUser(new User(0, "User2", 30, "u2@email.com"));
// 验证查询数量
assertEquals(userDAO.getAllUsers().size(), 2);
}
/*---------------- DAO 实现类 ----------------*/
static class UserDAO {
private final Connection conn;
public UserDAO(Connection conn) {
this.conn = conn;
}
// 插入用户,返回生成的ID
public int insertUser(User user) throws SQLException {
String sql = "INSERT INTO users(name, age, email) VALUES(?, ?, ?)";
try (PreparedStatement pstmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
pstmt.setString(1, user.getName());
pstmt.setInt(2, user.getAge());
pstmt.setString(3, user.getEmail());
pstmt.executeUpdate();
try (ResultSet rs = pstmt.getGeneratedKeys()) {
if (rs.next()) {
return rs.getInt(1);
}
}
}
return -1; // 表示失败
}
// 根据ID查询用户
public User getUserById(int id) throws SQLException {
String sql = "SELECT * FROM users WHERE id = ?";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setInt(1, id);
try (ResultSet rs = pstmt.executeQuery()) {
if (rs.next()) {
return new User(
rs.getInt("id"),
rs.getString("name"),
rs.getInt("age"),
rs.getString("email")
);
}
}
}
return null;
}
// 更新用户信息
public void updateUser(User user) throws SQLException {
String sql = "UPDATE users SET name=?, age=?, email=? WHERE id=?";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, user.getName());
pstmt.setInt(2, user.getAge());
pstmt.setString(3, user.getEmail());
pstmt.setInt(4, user.getId());
pstmt.executeUpdate();
}
}
// 删除用户
public void deleteUser(int id) throws SQLException {
String sql = "DELETE FROM users WHERE id=?";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setInt(1, id);
pstmt.executeUpdate();
}
}
// 查询所有用户
public List<User> getAllUsers() throws SQLException {
List<User> users = new ArrayList<>();
String sql = "SELECT * FROM users";
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
while (rs.next()) {
users.add(new User(
rs.getInt("id"),
rs.getString("name"),
rs.getInt("age"),
rs.getString("email")
));
}
}
return users;
}
}
/*---------------- 实体类 ----------------*/
static class User {
private int id;
private String name;
private int age;
private String email;
public User(int id, String name, int age, String email) {
this.id = id;
this.name = name;
this.age = age;
this.email = email;
}
// Getter方法省略(实际需要getter/setter)
}
}
关键点说明:
- 1.测试环境搭建
- •使用
@BeforeClass
初始化数据库连接 - •通过
@BeforeMethod
保证每个测试前有干净的测试环境 - •使用内存数据库或TRUNCATE保证测试隔离性
- •使用
- 2.数据库操作最佳实践
- •使用PreparedStatement防止SQL注入
- •资源自动关闭(try-with-resources)
- •事务管理(实际项目应添加)
- •返回生成的键值(用于获取自增ID)
- 3.测试覆盖场景
- •增:验证返回的ID和实际插入数据
- •删:验证数据消失
- •改:验证更新后的字段值
- •查:验证返回结果的数量和内容
- 4.TestNG特性使用
- •通过
priority
控制执行顺序 - •使用Assertions进行结果验证
- •测试方法独立运行但有序关联
- •通过
添加测试增强项:
// 在测试类中添加
@Test(expectedExceptions = SQLException.class)
public void testInvalidInsert() throws SQLException {
// 尝试插入不符合约束的数据
userDAO.insertUser(new User(0, null, 0, null));
}
@Test
public void testTransaction() throws SQLException {
try {
conn.setAutoCommit(false); // 关闭自动提交
// 业务操作...
userDAO.insertUser(testUser);
// 其他操作...
conn.commit(); // 提交事务
} catch (SQLException e) {
conn.rollback(); // 回滚事务
throw e;
} finally {
conn.setAutoCommit(true);
}
}
下面是一个使用Java + TestNG + MySQL实现数据库增删改查(CRUD)的完整示例,包含详细注释和测试用例:
import java.sql.*;
import org.testng.annotations.*;
import static org.testng.Assert.*;
public class UserDAOTest {
private Connection connection;
private UserDAO userDAO;
// 数据库配置(实际项目建议使用配置文件)
private static final String JDBC_URL = "jdbc:mysql://localhost:3306/testdb?useSSL=false";
private static final String USERNAME = "root";
private static final String PASSWORD = "yourpassword";
// 测试用的用户数据
private final User testUser = new User(0, "TestUser", 30, "test@email.com");
@BeforeClass
public void initDatabase() throws SQLException {
// 创建连接
connection = DriverManager.getConnection(JDBC_URL, USERNAME, PASSWORD);
userDAO = new UserDAO(connection);
// 初始化测试表
try (Statement stmt = connection.createStatement()) {
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS users (" +
"id INT AUTO_INCREMENT PRIMARY KEY, " +
"name VARCHAR(100) NOT NULL, " +
"age INT, " +
"email VARCHAR(100))");
}
}
@BeforeMethod
public void clearData() throws SQLException {
// 每次测试前清空表
try (Statement stmt = connection.createStatement()) {
stmt.executeUpdate("TRUNCATE TABLE users");
}
}
@AfterClass
public void closeConnection() throws SQLException {
if (connection != null && !connection.isClosed()) {
connection.close();
}
}
/*---------------- 测试用例 ----------------*/
@Test(priority = 1)
public void testInsertUser() throws SQLException {
int generatedId = userDAO.insertUser(testUser);
assertTrue(generatedId > 0, "返回的ID应该大于0");
// 验证插入的数据
User insertedUser = userDAO.getUserById(generatedId);
assertEquals(insertedUser.getName(), testUser.getName());
assertEquals(insertedUser.getAge(), testUser.getAge());
}
@Test(priority = 2)
public void testUpdateUser() throws SQLException {
// 先插入测试数据
int id = userDAO.insertUser(testUser);
// 修改用户
User updatedUser = new User(id, "UpdatedUser", 35, "updated@email.com");
userDAO.updateUser(updatedUser);
// 验证修改结果
User result = userDAO.getUserById(id);
assertEquals(result.getName(), updatedUser.getName());
assertEquals(result.getAge(), updatedUser.getAge());
}
@Test(priority = 3)
public void testDeleteUser() throws SQLException {
int id = userDAO.insertUser(testUser);
userDAO.deleteUser(id);
// 验证删除
assertNull(userDAO.getUserById(id), "用户应该已被删除");
}
@Test(priority = 4)
public void testGetAllUsers() throws SQLException {
// 插入多条数据
userDAO.insertUser(new User(0, "User1", 25, "u1@email.com"));
userDAO.insertUser(new User(0, "User2", 30, "u2@email.com"));
// 验证查询数量
assertEquals(userDAO.getAllUsers().size(), 2);
}
/*---------------- DAO 实现类 ----------------*/
static class UserDAO {
private final Connection conn;
public UserDAO(Connection conn) {
this.conn = conn;
}
// 插入用户,返回生成的ID
public int insertUser(User user) throws SQLException {
String sql = "INSERT INTO users(name, age, email) VALUES(?, ?, ?)";
try (PreparedStatement pstmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
pstmt.setString(1, user.getName());
pstmt.setInt(2, user.getAge());
pstmt.setString(3, user.getEmail());
pstmt.executeUpdate();
try (ResultSet rs = pstmt.getGeneratedKeys()) {
if (rs.next()) {
return rs.getInt(1);
}
}
}
return -1; // 表示失败
}
// 根据ID查询用户
public User getUserById(int id) throws SQLException {
String sql = "SELECT * FROM users WHERE id = ?";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setInt(1, id);
try (ResultSet rs = pstmt.executeQuery()) {
if (rs.next()) {
return new User(
rs.getInt("id"),
rs.getString("name"),
rs.getInt("age"),
rs.getString("email")
);
}
}
}
return null;
}
// 更新用户信息
public void updateUser(User user) throws SQLException {
String sql = "UPDATE users SET name=?, age=?, email=? WHERE id=?";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, user.getName());
pstmt.setInt(2, user.getAge());
pstmt.setString(3, user.getEmail());
pstmt.setInt(4, user.getId());
pstmt.executeUpdate();
}
}
// 删除用户
public void deleteUser(int id) throws SQLException {
String sql = "DELETE FROM users WHERE id=?";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setInt(1, id);
pstmt.executeUpdate();
}
}
// 查询所有用户
public List<User> getAllUsers() throws SQLException {
List<User> users = new ArrayList<>();
String sql = "SELECT * FROM users";
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
while (rs.next()) {
users.add(new User(
rs.getInt("id"),
rs.getString("name"),
rs.getInt("age"),
rs.getString("email")
));
}
}
return users;
}
}
/*---------------- 实体类 ----------------*/
static class User {
private int id;
private String name;
private int age;
private String email;
public User(int id, String name, int age, String email) {
this.id = id;
this.name = name;
this.age = age;
this.email = email;
}
// Getter方法省略(实际需要getter/setter)
}
}
通过@DataProvider
实现数据驱动测试,需添加MySQL驱动依赖:
import org.testng.annotations.*;
import java.sql.*;
public class MySQLTest {
@DataProvider(name = "dbData")
public Object[][] getData() throws Exception {
Class.forName("com.mysql.cj.jdbc.Driver");
Connection conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/test", "root", "password");
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM test_data");
List<Object[]> data = new ArrayList<>();
while(rs.next()) {
data.add(new Object[]{rs.getString(1), rs.getString(2)});
}
return data.toArray(new Object[0][]);
}
@Test(dataProvider = "dbData")
public void testDB(String param1, String param2) {
System.out.println(param1 + ":" + param2);
}
}
- 使用JDBC建立MySQL连接
- 通过
@DataProvider
将查询结果转换为测试参数 - 测试方法接收数据库查询结果作为输入参数
依赖
<!-- pom.xml -->
<dependencies>
<!-- MySQL驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
<!-- TestNG -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.4.0</version>
</dependency>
</dependencies>
二、TestNG连接Redis
一个完整的Redis操作示例,包含字符串、哈希、列表、集合、有序集合等数据结构的CRUD操作和相应的TestNG测试
代码示例:
import org.testng.annotations.*;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.util.*;
import static org.testng.Assert.*;
public class RedisOperationsTest {
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
private static JedisPool jedisPool;
private Jedis jedis;
// Redis操作工具类
public static class RedisUtils {
public static void setKey(Jedis jedis, String key, String value) {
jedis.set(key, value);
}
public static String getKey(Jedis jedis, String key) {
return jedis.get(key);
}
public static void setHash(Jedis jedis, String key, Map<String, String> hash) {
jedis.hset(key, hash);
}
public static Map<String, String> getHash(Jedis jedis, String key) {
return jedis.hgetAll(key);
}
public static void addToList(Jedis jedis, String key, String... values) {
jedis.lpush(key, values);
}
public static List<String> getListRange(Jedis jedis, String key, long start, long end) {
return jedis.lrange(key, start, end);
}
public static void addToSet(Jedis jedis, String key, String... members) {
jedis.sadd(key, members);
}
public static Set<String> getSetMembers(Jedis jedis, String key) {
return jedis.smembers(key);
}
public static void addToSortedSet(Jedis jedis, String key, Map<String, Double> scoreMembers) {
scoreMembers.forEach((member, score) -> jedis.zadd(key, score, member));
}
public static Set<String> getSortedSetRange(Jedis jedis, String key, long start, long end) {
return jedis.zrange(key, start, end);
}
}
@BeforeClass
public static void setupPool() {
// 配置连接池
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(10);
jedisPool = new JedisPool(poolConfig, REDIS_HOST, REDIS_PORT);
}
@BeforeMethod
public void getResource() {
// 获取Jedis实例
jedis = jedisPool.getResource();
// 清除测试环境(生产环境不要用flushDB)
jedis.select(15); // 使用独立测试DB
jedis.flushDB();
}
@AfterMethod
public void returnResource() {
if (jedis != null) {
jedis.close();
}
}
@AfterClass
public static void tearDownPool() {
if (jedisPool != null) {
jedisPool.close();
}
}
/*---------------- 字符串类型测试 ----------------*/
@Test
public void testStringOperations() {
// 设置值
RedisUtils.setKey(jedis, "test:string:key", "hello redis");
// 获取值
String value = RedisUtils.getKey(jedis, "test:string:key");
assertEquals(value, "hello redis");
// 更新值
RedisUtils.setKey(jedis, "test:string:key", "updated value");
value = RedisUtils.getKey(jedis, "test:string:key");
assertEquals(value, "updated value");
// 删除key
jedis.del("test:string:key");
value = RedisUtils.getKey(jedis, "test:string:key");
assertNull(value, "Key should be deleted");
}
/*---------------- 哈希类型测试 ----------------*/
@Test
public void testHashOperations() {
Map<String, String> user = new HashMap<>();
user.put("name", "Alice");
user.put("age", "28");
user.put("email", "alice@example.com");
// 设置哈希值
RedisUtils.setHash(jedis, "user:1001", user);
// 获取完整哈希
Map<String, String> result = RedisUtils.getHash(jedis, "user:1001");
assertEquals(result.get("name"), "Alice");
assertEquals(result.get("age"), "28");
// 更新单个字段
jedis.hset("user:1001", "age", "29");
assertEquals(jedis.hget("user:1001", "age"), "29");
// 删除字段
jedis.hdel("user:1001", "email");
result = RedisUtils.getHash(jedis, "user:1001");
assertFalse(result.containsKey("email"), "Email field should be deleted");
}
/*---------------- 列表类型测试 ----------------*/
@Test
public void testListOperations() {
// 添加列表元素
RedisUtils.addToList(jedis, "recent:users", "user3", "user2", "user1");
// 获取列表范围
List<String> recentUsers = RedisUtils.getListRange(jedis, "recent:users", 0, -1);
assertEquals(recentUsers.size(), 3);
assertEquals(recentUsers.get(0), "user1"); // 最近添加的在前面
// 修改指定位置的元素
jedis.lset("recent:users", 1, "user99");
assertEquals(jedis.lindex("recent:users", 1), "user99");
// 删除元素
jedis.lrem("recent:users", 1, "user99");
recentUsers = RedisUtils.getListRange(jedis, "recent:users", 0, -1);
assertEquals(recentUsers.size(), 2);
}
/*---------------- 集合类型测试 ----------------*/
@Test
public void testSetOperations() {
// 添加集合成员
RedisUtils.addToSet(jedis, "user:1001:followers", "follower1", "follower2", "follower3");
// 获取所有成员
Set<String> followers = RedisUtils.getSetMembers(jedis, "user:1001:followers");
assertEquals(followers.size(), 3);
assertTrue(followers.contains("follower2"));
// 删除成员
jedis.srem("user:1001:followers", "follower2");
followers = RedisUtils.getSetMembers(jedis, "user:1001:followers");
assertFalse(followers.contains("follower2"), "Follower2 should be removed");
// 集合操作:交集
jedis.sadd("user:1002:followers", "follower3", "follower4");
Set<String> common = jedis.sinter("user:1001:followers", "user:1002:followers");
assertEquals(common.size(), 1);
assertTrue(common.contains("follower3"));
}
/*---------------- 有序集合测试 ----------------*/
@Test
public void testSortedSetOperations() {
Map<String, Double> scores = new HashMap<>();
scores.put("playerA", 2500.0);
scores.put("playerB", 1800.0);
scores.put("playerC", 3200.0);
// 添加带分值的成员
RedisUtils.addToSortedSet(jedis, "game:leaderboard", scores);
// 按分值范围获取
Set<String> topPlayers = RedisUtils.getSortedSetRange(jedis, "game:leaderboard", 0, 1);
assertEquals(topPlayers.size(), 2);
assertEquals(new ArrayList<>(topPlayers).get(0), "playerC"); // 最高分排在前面
// 更新分值
jedis.zincrby("game:leaderboard", 500.0, "playerB");
double newScore = jedis.zscore("game:leaderboard", "playerB");
assertEquals(newScore, 2300.0, 0.001);
// 按排名范围删除
jedis.zremrangeByRank("game:leaderboard", 2, 2); // 删除第三名
assertEquals(jedis.zcard("game:leaderboard"), 2);
}
/*---------------- 高级操作测试 ----------------*/
@Test
public void testTransactionAndExpire() {
// 事务操作
jedis.watch("balance"); // 监控key
Transaction t = jedis.multi();
t.set("balance", "100");
t.expire("balance", 10); // 10秒后过期
t.exec();
// 验证事务结果
String balance = RedisUtils.getKey(jedis, "balance");
assertEquals(balance, "100");
// 检查TTL
long ttl = jedis.ttl("balance");
assertTrue(ttl > 0 && ttl <= 10, "Key should have TTL set");
// 设置NX(不存在才设置)
long result = jedis.setnx("unique:key", "value");
assertEquals(result, 1);
// 再次设置应该失败
result = jedis.setnx("unique:key", "new-value");
assertEquals(result, 0);
}
@Test
public void testKeyOperations() {
RedisUtils.setKey(jedis, "temp:key", "data");
// 检查是否存在
assertTrue(jedis.exists("temp:key"));
// 重命名key
jedis.rename("temp:key", "renamed:key");
assertTrue(jedis.exists("renamed:key"));
assertFalse(jedis.exists("temp:key"));
// 设置过期时间
jedis.expire("renamed:key", 2); // 2秒后过期
// 检查过期
try {
Thread.sleep(2500);
} catch (InterruptedException e) {
e.printStackTrace();
}
assertFalse(jedis.exists("renamed:key"), "Key should be expired");
}
}
- 1.连接池管理:
- •使用
JedisPool
管理连接,提高性能 - •
@BeforeMethod
初始化每个测试的Jedis实例 - •
@AfterMethod
确保资源正确释放 - •使用独立测试数据库(DB 15)保证测试隔离
- •使用
- 2.Redis数据结构操作:
// 字符串
jedis.set("key", "value");
jedis.get("key");
// 哈希
jedis.hset("user:1001", Map.of("name","Alice"));
jedis.hgetAll("user:1001");
// 列表
jedis.lpush("list", "item1", "item2");
jedis.lrange("list", 0, -1);
// 集合
jedis.sadd("set", "member1", "member2");
jedis.smembers("set");
// 有序集合
jedis.zadd("zset", 1.0, "memberA");
jedis.zrange("zset", 0, -1);
- 3.高级功能测试:
- •事务处理:
MULTI/EXEC/WATCH
命令组合 - •键过期:
EXPIRE/TTL
测试 - •原子操作:
SETNX
(不存在才设置) - •数据过期验证:TTL和自动删除
- •键重命名:
RENAME
操作
- •事务处理:
- 4.测试验证点:
- •数据操作的正确性(CRUD)
- •数据结构特性验证(有序集合排序、集合去重等)
- •原子操作和事务的隔离性
- •键生命周期管理(TTL/过期)
最佳实践建议:
1.测试环境隔离:
- •使用专用测试数据库(非生产DB)
- •
@BeforeMethod
中调用jedis.flushDB()
清理状态 - •每个测试方法使用独立键前缀(如
test:string:key
)
2.连接管理:
// 正确的资源关闭方式 try (Jedis jedis = jedisPool.getResource()) { // 操作Redis... } // 自动关闭连接
3.Redis配置优化:
JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(20); // 最大连接数 config.setMaxIdle(10); // 最大空闲连接 config.setMinIdle(5); // 最小空闲连接 config.setTestOnBorrow(true); // 借用时验证
4.异常处理增强:
@Test(expectedExceptions = JedisConnectionException.class) public void testConnectionError() { try (Jedis brokenJedis = new Jedis("invalid.host", 6379)) { brokenJedis.ping(); } }
所需依赖:
<dependencies>
<!-- Jedis Redis客户端 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.0</version>
</dependency>
<!-- TestNG测试框架 -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.7.1</version>
<scope>test</scope>
</dependency>
</dependencies>
三、TestNG连接Kafka
一个完整的 Kafka 生产者和消费者操作测试框架,使用 TestNG 进行全面的集成测试
代码示例:
org.testng.annotations.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaTest {
private static final String TOPIC = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
@Test
public void testProduceConsume() {
// 生产者配置
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// 消费者配置
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
// 生产测试消息
try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
producer.send(new ProducerRecord<>(TOPIC, "key1", "TestNG Message"));
}
// 消费验证
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Collections.singleton(TOPIC));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
assert records.count() > 0 : "未收到消息";
}
}
}
代码说明:
- 配置Kafka生产者和消费者客户端
- 测试方法包含完整的消息生产和消费验证流程
- 使用try-with-resources确保资源释放
关键点总结
- MySQL:通过JDBC连接,适合数据驱动测试场景
- Redis:使用Jedis客户端,适合缓存验证测试
- Kafka:需要配置生产/消费者,适合消息系统集成测试
- 所有示例均采用TestNG的生命周期注解管理测试资源