java 应用,生产者单线程分页查询数据库并写入 redis 队列中,消费者 30 个线程多线程消费 list 。本来是没有出现过这个报错的,但是更新了生产者的 sql 后,sql 速度变快了?每次正常运行一段时间后,生产者异常就出现,消费者也接着出现异常
报的异常如下
生产者异常: :"sendQueueMessageInfo leftPushAll : 异常 : org.springframework.data.redis.RedisConnectionFailureException: Unexpected end of stream.; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream. org.springframework.data.redis.connection.jedis.JedisExceptionConverter.convert(JedisExceptionConverter.java:67) org.springframework.data.redis.connection.jedis.JedisExceptionConverter.convert(JedisExceptionConverter.java:41) org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) org.springframework.data.redis.connection.jedis.JedisConnection.convertJedisAccessException(JedisConnection.java:142) org.springframework.data.redis.connection.jedis.JedisListCommands.convertJedisAccessException(JedisListCommands.java:486) org.springframework.data.redis.connection.jedis.JedisListCommands.lPush(JedisListCommands.java:84) org.springframework.data.redis.connection.DefaultedRedisConnection.lPush(DefaultedRedisConnection.java:444) org.springframework.data.redis.core.DefaultListOperations.lambda$leftPushAll$2(DefaultListOperations.java:124) org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:224) org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:184) org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:95) org.springframework.data.redis.core.DefaultListOperations.leftPushAll(DefaultListOperations.java:124) com.service.ds.exec.DataSourceActuator.sendQueueMessageInfo(DataSourceActuator.java:392) com.service.ds.exec.DataSourceActuator.execAndPushQueue(DataSourceActuator.java:214) com.controller.rest.etl.EtlTaskRestController$1.run(EtlTaskRestController.java:86) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)\
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream. redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:199) redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) redis.clients.jedis.Protocol.process(Protocol.java:153) redis.clients.jedis.Protocol.read(Protocol.java:218) redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:341) redis.clients.jedis.Connection.getIntegerReply(Connection.java:266) redis.clients.jedis.BinaryJedis.lpush(BinaryJedis.java:1119) org.springframework.data.redis.connection.jedis.JedisListCommands.lPush(JedisListCommands.java:82)\n\t... 12 more\n","stackTrace":"","reqId":"","elapsedTime":null}
消费者异常: 1","className":"com.service.schedule.SchedulerTask$1","lineNumber":"63","message":"队列消费 : 异常 : org.springframework.data.redis.RedisConnectionFailureException: Cannot get Jedis connection; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:281) org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:464) org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:132) org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:95) org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:82) org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:211) org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:184) org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:95) org.springframework.data.redis.core.DefaultListOperations.leftPush(DefaultListOperations.java:99) com.service.etl.consumer.QueueConsumer.retryQueueMessage(QueueConsumer.java:101) com.service.etl.consumer.QueueConsumer.consume(QueueConsumer.java:180) com.service.schedule.SchedulerTask$1.run(SchedulerTask.java:60) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)java.lang.Thread.run(Thread.java:745)\nCaused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the poolredis.clients.util.Pool.getResource(Pool.java:53)redis.clients.jedis.JedisPool.getResource(JedisPool.java:226)redis.clients.jedis.JedisPool.getResource(JedisPool.java:16) org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:271)\n\t... 14 more\nCaused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: Connection resetredis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202) redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) redis.clients.jedis.Protocol.process(Protocol.java:153) redis.clients.jedis.Protocol.read(Protocol.java:218) redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:341) redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:240) redis.clients.jedis.BinaryJedis.auth(BinaryJedis.java:2223) redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:108) org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:889) org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:424) org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:349) redis.clients.util.Pool.getResource(Pool.java:49)\n\t... 17 more\nCaused by: java.net.SocketException: Connection reset java.net.SocketInputStream.read(SocketInputStream.java:209) java.net.SocketInputStream.read(SocketInputStream.java:141) java.net.SocketInputStream.read(SocketInputStream.java:127) redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196)\n\t... 28 more\n","stackTrace":"","reqId":"","elapsedTime":null}
redis 配置
public RedisTemplate<String, QueueMessageInfo> redisQueueMessageTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, QueueMessageInfo> template = new RedisTemplate<String, QueueMessageInfo>();
template.setConnectionFactory(factory);
// 使用 Jackson2JsonRedisSerializer 来序列化和反序列化 redis 的 value 值
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(om);
template.setValueSerializer(serializer);
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean
public RedisConnectionFactory connectionFactory() {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(maxActive);
poolConfig.setMaxIdle(maxIdle);
poolConfig.setMaxWaitMillis(maxWait);
poolConfig.setMinIdle(minIdle);
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(false);
poolConfig.setTestWhileIdle(true);
JedisClientConfiguration jedisClientConfiguration = null;
if (ssl) {
jedisClientConfiguration = JedisClientConfiguration.builder().usePooling().poolConfig(poolConfig).and()
.readTimeout(Duration.ofMillis(timeout)).useSsl().build();
} else {
jedisClientConfiguration = JedisClientConfiguration.builder().usePooling().poolConfig(poolConfig).and()
.readTimeout(Duration.ofMillis(timeout)).build();
}
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setDatabase(redisDatabase);
redisStandaloneConfiguration.setPort(port);
redisStandaloneConfiguration.setPassword(password);
redisStandaloneConfiguration.setHostName(host);
RedisConnectionFactory redisConnectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration,
jedisClientConfiguration);
logger.info("初始化 redis 链接池 : JedisPoolConfig");
return redisConnectionFactory;
}
# 池在给定时间可以分配的最大连接数。使用负值无限制。
#spring.redis.pool.max-active=200
spring.redis.pool.max-active=200
# 池中“空闲”连接的最大数量。使用负值表示无限数量的空闲连接。
# spring.redis.pool.max-idle=20
spring.redis.pool.max-idle=20
# 连接分配在池被耗尽时抛出异常之前应该阻塞的最长时间量(以毫秒为单位)。使用负值可以无限期地阻止。
spring.redis.pool.max-wait=9000
# 目标为保持在池中的最小空闲连接数。这个设置只有在正面的情况下才有效果。
#spring.redis.pool.min-idle=20
spring.redis.pool.min-idle=20
1
rqxiao OP 额 先用 LettuceConnectionFactory 了
|