首页  ·  知识 ·  架构设计
Kubernetes环境Java微服务应用开发手册
Jagger Wang  CIO之家的朋友们  综合  编辑:means   图片来源:网络
本文来源于于笔者所带领的研发团队在Kubernetes环境下开发Java微服务应用的实践总结。Kubernetes本身提供了许多微服务架构组件,比如使用Service来替代Eureka、Nacos等服务注册发现组件,使用

录结构

本应用的各微服务项目遵循 干净架构最佳实践,以用户微服务为例,其项目目录结构如下:


.
├── adapter // 适配层,包括对外提供用例层服务(API、CLI 等),以及用例层调用外部服务
│   ├── Application.java
│   ├── api // 以 API 方式调用用例
│   │   ├── config
│   │   ├── controller
│   │   └── exception
│   ├── cli // 以 CLI 方式调用用例
│   │   └── job
│   ├── dto // 对外输入和输出的对象
│   ├── gui // 以 GUI 方式调用用例
│   └── port // 外部依赖服务接口实现
│       │── dao // 数据访问接口实现,可以使用 MyBatis 或 JPA,切换时不会影响到用例层
│       │── email // 邮件发送接口实现
│       │── mobile // 短信发送接口实现
│       └── pay // 支付接口实现
│── entity // 业务实体
│  ├── UserBO.java // 用户实体
│  └── RoleBO.java // 角色实体
│── usecase // 用例层,亦即业务逻辑层
│  ├── UserUsecase.java // 用户模块用例
│  ├── exception // 业务异常定义
│  └── port // 外部依赖服务接口定义
│       │── dao // 数据访问接口
│       │── email // 邮件发送接口
│       │── mobile // 短信发送接口
│       └── pay // 支付接口
└── util // 工具类


为了减少代码冗余,将各微服务中的公用代码抽取到了 common 包中,其目录结构跟微服务类似。

.
├── adapter // 适配层
│   ├── api
│   └── cli
└── util  // 适配层
   ├── encoder
   ├── generator
   ├── jwt
   └── pagination

Plain

Copy

为了方便客户端调用 API(包括微服务之间相互调用),将各微服务的 API 封装到了 client 包下,client 包可以被 common 包引用,但不能反过来 。

.
├── ApiResultDecoder.java
├── async // 异步调用,适合 WebFlux 环境
│   ├── ApiConfiguration.java
│   └── user
├── dto // API 数据传输对象
│   ├── ApiResult.java
│   ├── storage
│   └── user
├── exception // API 异常
│   ├── ApiException.java
│   ├── UsecaseCode.java
│   └── UsecaseException.java
└── sync // 同步调用
   ├── ApiConfiguration.java
   ├── UserRelayRequestInterceptor.java
   ├── storage
   └── user

Maven 规范

  1. 各微服务的依赖包版本统一在 parent 项目里指定,其它项目引入依赖包时不要指定版本号;

  2. POM 文件里依赖包引入按底层(比如驱动)到上层(比如 ORM 框架)的顺序,同一个 Group 的包相邻放置,测试相关的依赖包放在后面;

  3. common 项目里不要直接引入 Spring Boot Starter 依赖包(比如 com.baomidou.

mybatis-plus-boot-starter),而是要引入底层包(比如 com.baomidou.mybatis-plus),以免其它项目引入 common 包时执行了不需要的自动配置;

异常规范

  1. 用例层使用 UsecaseException,适配层使用 ApiExceptionApiException 相比于 UsecaseException 可以指定 HTTP 响应状态码;

数据访问层规范

库表迁移规范

  1. 迁移文件统一放在 ops/db/migration 目录下,文件命名遵循 Flyway 规范,在此规范上稍做补充,规则为:项目版本号_递增数字__描述比如 V0.0.1_2__Add_new_table.sql

  2. 为了可控性和简单性,使用手动迁移而不是 Flyway 自动迁移,并且只使用 V 前缀的升级文件;

  3. 每次提交测试时(包括 Bug 修复),如有升级库表结构,那么需要创建一个新的升级文件,版本号依次增加;

Redis 使用规范

引入 Spring Data Redis 依赖。

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId></dependency>


Spring Boot 配置。

package ai.basic.basicai.user.adapter.api.config;/**
* @author Jagger Wang
*/@Configurationpublic class RedisConfig {
   @Value("${spring.redis.host}")
   private String host;

   @Value("${spring.redis.port}")
   private int port;

   @Value("${spring.redis.password}")
   private String password;

   /**
    * 将对象以 JSON 格式保存到 Redis 里。不推荐使用默认的 JdkSerializationRedisSerializer,存在安全风险,
    * 可读性也不好。
    * @param objectMapperBuilder
    * @return
    */
   @Bean
   public GenericJackson2JsonRedisSerializer jsonRedisSerializer(
           Jackson2ObjectMapperBuilder objectMapperBuilder) {
       var objectMapper = objectMapperBuilder.build();

       // 配置 ObjectMapper,使得在序列化后的 JSON 对象里存放类型信息,以便反序列化时能获得正确类型的对象
       GenericJackson2JsonRedisSerializer.registerNullValueSerializer(objectMapper, null);
       objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(),
               ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);

       return new GenericJackson2JsonRedisSerializer(objectMapper);
   }

   /**
    * 用来将对象转换为 Map,以便保存为 Redis Hash。
    * @param objectMapper
    * @return
    */
   @Bean
   public Jackson2HashMapper hashMapper(ObjectMapper objectMapper) {
       return new Jackson2HashMapper(objectMapper, false);
   }

   /**
    * 使用功能更强大和完善的 Lettuce,而非 Jedis
    * @return
    */
   @Bean
   public LettuceConnectionFactory redisConnectionFactory() {
       var serverConfig = new RedisStandaloneConfiguration(host, port);
       serverConfig.setPassword(password);

       var clientConfig = LettucePoolingClientConfiguration.builder()
               .readFrom(REPLICA_PREFERRED)
               .build();

       return new LettuceConnectionFactory(serverConfig, clientConfig);
   }

   @Bean
   public RedisTemplate<String, Object> redisTemplate(
           RedisConnectionFactory redisConnectionFactory,
           GenericJackson2JsonRedisSerializer jsonRedisSerializer) {
       var template = new RedisTemplate<String, Object>();
       template.setConnectionFactory(redisConnectionFactory);
       template.setKeySerializer(RedisSerializer.string());
       template.setValueSerializer(jsonRedisSerializer);
       template.setHashKeySerializer(RedisSerializer.string());
       template.setHashValueSerializer(jsonRedisSerializer);
       return template;
   }

   @Bean
   public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
       return new StringRedisTemplate(redisConnectionFactory);
   }}


如果是把 Redis 当作 Key-Value 数据库来使用,推荐采用类似数据库表访问的 DAO 模式,为每种类型的数据创建一个 DAO 对象,每个 Key 对应一个主键 ID。 首先,定义 DAO 接口。通过 DAO 接口可以屏蔽底层的实现细节,对于调用者来说不用关心数据存放在哪,是 MySQL 这样的关系数据库还是 Redis 这样的 Key-Value 数据库,方便后续替换底层存储引擎。

package ai.basic.basicai.user.usecase.port.dao;/**
* @author Jagger Wang
*/public interface CounterDAO {

   /**
    * 获取计数
    * @param key Key
    * @return 计数
    */
   Long get(String key);

   /**
    * 设置计数
    * @param key Key
    * @param value 计数
    */
   void set(String key, Long value);

   /**
    * 变化量,返回更新后的计数,并发安全
    * @param key Key
    * @param delta 变化量
    * @return 更新后的计数
    */
   Long inc(String key, Long delta);}


其次,实现 DAO。这里把一些公共的约定,比如 Key 命名前缀,提取到了抽象基类 AbstractRedisDAO 里。Key 命名遵循 <app>:<service>:<table>:<id> 规则,比如 basicai:user:user:1

package ai.basic.basicai.user.adapter.port.dao;/**
* @author Jagger Wang
*/@Componentpublic class CounterDAOImpl extends AbstractRedisDAO<RedisTemplate<String, Object>>
       implements CounterDAO {

   public CounterDAOImpl(RedisTemplate<String, Object> template) {
       super(template, "user", "counter");
   }

   @Override
   public Long get(String key) {
       var value = (Integer) template.opsForValue().get(prefixedKey(key));
       if (value == null) {
           return 0L;
       }

       return Long.valueOf(value);
   }

   @Override
   public void set(String key, Long value) {
       template.opsForValue().set(prefixedKey(key), value);
   }

   @Override
   public Long inc(String key, Long delta) {
       return template.opsForValue().increment(prefixedKey(key), delta);
   }}


这里没有使用功能更强大也更复杂的 Redis Repository 来保存对象到 Redis Hash 里,也不推荐在 Redis 里存放结构比较复杂和对可靠性要求比较高的数据。 参考资料:

  • Spring Data Redis 官方文档

  • Introduction to Spring Data Redis

缓存使用规范

引入 Spring Cache 依赖。

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-cache</artifactId></dependency>


Spring Boot 配置。

package ai.basic.basicai.user.adapter.api.config;/**
* @author Jagger Wang
*/@Configuration
@EnableCachingpublic class CacheConfig extends CachingConfigurerSupport {

   /**
    * 配置各个 Cache,包括 TTL、Prefix 等
    * @param jsonRedisSerializer
    * @return
    */
   @Bean    public RedisCacheManagerBuilderCustomizer redisCacheManagerBuilderCustomizer(
           GenericJackson2JsonRedisSerializer jsonRedisSerializer    ) {
       var defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig()
               .entryTtl(Duration.ofMinutes(60))
               .serializeValuesWith(RedisSerializationContext.SerializationPair                        .fromSerializer(jsonRedisSerializer))
               .computePrefixWith(new CacheKeyPrefix() {
                   String SEPARATOR = ":";

                   @Override                    public String compute(String cacheName) {
                       return "basicai" + SEPARATOR + "user" + SEPARATOR + cacheName + SEPARATOR;
                   }
               });

       return (builder) -> builder                .withCacheConfiguration("user", defaultCacheConfig                        .entryTtl(Duration.ofMinutes(12 * 60)));
   }}


使用 Spring Cache 的注解来自动缓存方法结果。推荐明确指定 key,而不是依赖 Spring Cache 自动生成的,更具可读性。

package ai.basic.basicai.user.adapter.port.dao;/**
* @author Jagger Wang
*/@Componentpublic class UserDAOImpl extends AbstractDAO implements UserDAO {
   private final UserMapper userMapper;

   public UserDAOImpl(UserMapper userMapper) {
       this.userMapper = userMapper;
   }

   @Override
   @CachePut(cacheNames = "user", key = "#userBO.getId()")
   public UserBO saveOrUpdate(UserBO userBO) {
       User user = User.fromBO(userBO);
       userMapper.saveOrUpdate(user);
       return user.toBO();
   }

   @Override
   @Cacheable(cacheNames = "user", key = "#id")
   public Optional<UserBO> findById(Long id) {
       return Optional.ofNullable(userMapper.selectById(id))
               .map(User::toBO);
   }}


参考资料:

  • Spring Cache 官方文档

  • Spring Boot Cache with Redis

Job 规范

Job 用来执行一些后台任务,比如缩略图生成、消息处理等,Job 跟 API 服务一样,都是经由 Kubernetes 来调度运行,具体可参考官方文档 Jobs。

应用入口

为了避免同一个项目针对不同运行方式分别打包,统一使用一个应用来启动 Web 服务和执行 Job,通过命令行参数来区分以哪种方式运行,同时通过 --job 参数来区分不同 Job。

package ai.basic.basicai.user.adapter;@SpringBootApplication(scanBasePackages = "ai.basic.basicai.user.adapter")@MapperScan("ai.basic.basicai.user.adapter.port.dao.mybatis.mapper")@EnableFeignClients(basePackages = "ai.basic.basicai.client.sync")@EnableAsyncpublic class Application implements ApplicationRunner {
  private static Logger logger = LoggerFactory.getLogger(Application.class);

  public static void main(String[] args) {
     var app = new SpringApplication(Application.class);
     // 未指定任何命令行参数时以 Web 服务方式运行,否则以命令方式运行
     if (args.length == 0) {
        app.setWebApplicationType(WebApplicationType.SERVLET);
        app.run(args);
     } else {
        app.setWebApplicationType(WebApplicationType.NONE);
        System.exit(SpringApplication.exit(app.run(args)));
     }
  }

  @Override
  public void run(ApplicationArguments args) throws Exception {
     var values = args.getOptionValues("job");
     // 未指定 Job 时以 Web 服务方式运行,无需执行任何命令
     if (values == null) {
        return;
     }

     var job = "";
     if (!values.isEmpty()) {
        job = values.get(0);
     }

     switch (job) {
        case "short":
           // 如有命令行参数可通过构造函数参数传递给 Job。如果需要自动注入依赖对象,可定义一个 Job Bean。
           new ShortJob(job).run();
           break;
        case "long":
           new LongJob(job).run();
           break;
        default:
           logger.error(String.format("unknown job %s", job));
     }
  }}


编写 Job

所有 Job 统一放在 cli.job 包下,继承于 AbstractJob 基类。

package ai.basic.basicai.user.adapter.cli.job;public class ShortJob extends AbstractJob {
   private static Logger logger = LoggerFactory.getLogger(ShortJob.class);

   public ShortJob(String name) {
       super(name);
   }

   @Override
   protected void work() {
       logger.info("do some work");
   }}

Java

Copy

构建镜像

此外,在 Dockerfile 里最好使用 ENTRYPOINT 而不是 CMD 来指定启动命令,这样在执行 Job 时只需指定命令参数,否则需要指定完整的命令。

FROM openjdk:11RUN apt update && \
   apt install -y iputils-ping curl wget netcatWORKDIR /appCOPY target/$JAR_PACKAGE_NAME ./RUN mkdir -p configEXPOSE 8080ENTRYPOINT ["java", "-Djava.security.egd=file:/dev/./urandom", "-jar", "$JAR_PACKAGE_NAME"]


执行  Job

假设镜像名为 registry.talos.basic.ai/basicai/backend/user,那么可以按如下的命令来分别启动 Web 服务和执行 Job。

# 启动 Web 服务docker container run --rm registry.talos.basic.ai/basicai/backend/user# 执行 Jobdocker container run --rm registry.talos.basic.ai/basicai/backend/user --job=short

Bash

Copy

在 Kubernetes 里可通过下面的部署文件来执行有结束状态的短期 Job。对于每次部署时都需要执行的长期 Job(比如消息队列消费者),需要使用 Deployment 来部署,并且建议添加到 CI deployment 文件里,跟 API 服务一起部署。

# 有结束状态的短期 Job,需手动执行,执行之前需先删除现有同名 Job,Kubernetes Job 不允许更新apiVersion: batch/v1kind: Jobmetadata:
 name: user-shortjobspec:
 # 完成次数
 completions: 1
 # 并发数
 parallelism: 1
 # 失败重试次数
 backoffLimit: 6
 selector:
   matchLabels:
     job-name: user-shortjob  template:
   metadata:
     labels:
       job-name: user-shortjob    spec:
     # 提前准备好拉取镜像的凭证
     imagePullSecrets:
       - name: basicai-registry      restartPolicy: Never      containers:
       - name: user-shortjob          # 生产环境请使用正确的版本号
         image: registry.talos.basic.ai/basicai/backend/user:dev          args:
           - --job=short          resources:
           requests:
             memory: 400Mi              cpu: 0.2
           limits:
             memory: 4Gi              cpu: 2
         volumeMounts:
           - name: config              mountPath: /app/config      volumes:
       - name: config          configMap:
           name: user


单元测试规范

因为集成测试会依赖各种外部服务,执行起来比较困难也比较慢,并且通过功能强大的 API 测试工具(比如 Apifox)就能够基本达到集成测试的要求,因此这里不推荐编写集成测试,而是把精力集中到更易于执行的单元测试上。对于单元测试,也要重点针对业务逻辑复杂的用例层接口去做,不需要对只进行简单增删改查的 DAO 接口去做。

用例层单元测试

package ai.basic.basicai.user.usecase;...@ExtendWith(SpringExtension.class)public class UserUsecaseTest {

   @TestConfiguration
   static class UserControllerTestConfiguration {

       @Bean
       public UserUsecase userUsecase() {
           return new UserUsecase();
       }
   }

   @Autowired
   private UserUsecase userUsecase;

   @MockBean
   private UserDAO userDAO;

   @MockBean
   private RoleDAO roleDAO;

   @MockBean
   private RandomGenerator randomGenerator;

   @MockBean
   private PasswordEncoder passwordEncoder;

   @MockBean
   private UserEmailPublisher emailPublisher;

   @MockBean
   private StorageService storageService;

   @BeforeEach
   public void setUp() {
       var jagger = UserBO.builder().id(1L)
               .username("jaggerwang")
               .email("jaggerwang@gmail.com")
               .nickname("Jagger Wang")
               .build();

       Mockito.when(userDAO.findById(jagger.getId(), UserBO.Status.ACTIVE))
               .thenReturn(Optional.of(jagger));
   }

   @Test
   public void whenValidId_thenUserShouldBeFound() {
       var id = (Long) 1L;
       var user = userUsecase.info(id);

       assertDoesNotThrow(() -> {
           user.get();
       });

       assertEquals(id, user.get().getId());
   }}

Java

Copy

  1. 使用了 @ExtendWith 注解而不是 @SpringBootTest,避免启动一个完整的应用上下文,从而加快单元测试执行速度;

  2. 使用 @TestConfiguration 注解来声明了一个只给本单元测试使用的配置类,里面可以定义本单元测试里需要自动注入的 Bean,比如被测试的用例对象;

  3. 对于被测试的用例对象依赖的其它 Bean 对象,可以通过 @MockBean 注解来声明;

  4. 在 setUp 方法里使用 Mockito 来 Mock 被测试的用例对象方法里所调用的其它依赖对象的方法,注意这里不需要 Mock 依赖对象的所有方法;

控制器层单元测试

package ai.basic.basicai.user.adapter.api.controller;...@ExtendWith(SpringExtension.class)@WebMvcTest(UserController.class)@ContextConfiguration(classes = TestApplication.class)public class UserControllerTest {

   @Autowired
   private MockMvc mvc;

   @MockBean
   private UserUsecase userUsecase;

   @MockBean
   private TeamUsecase teamUsecase;

   @BeforeEach
   public void setUp() {
       var jagger = UserBO.builder().id(1L)
               .username("jaggerwang")
               .email("jaggerwang@gmail.com")
               .nickname("Jagger Wang")
               .build();

       Mockito.when(userUsecase.info(jagger.getId()))
               .thenReturn(Optional.of(jagger));

       Mockito.when(teamUsecase.findMemberByUserId(jagger.getId()))
               .thenReturn(List.of());
   }

   @Test
   public void whenValidId_thenUserShouldBeFound() throws Exception {
       var id = 1;

       mvc.perform(MockMvcRequestBuilders.get(String.format("/user/info/%d", id)))
               .andExpect(MockMvcResultMatchers.status().isOk())
               .andExpect(MockMvcResultMatchers.jsonPath("$.code", Matchers.is("OK")))
               .andExpect(MockMvcResultMatchers.jsonPath("$.data.user.id", Matchers.is(id)));
   }}


  1. 相比用例层单元测试,控制器层单元测试使用了 @ContextConfiguration 注解来指定了测试配置类,否则会自动向上搜索到主应用类,从而导致自动配置各种不需要的组件,还会引起报错;

单元测试编写好后,注意打开 CI/CD 流水线里 Maven 构建任务的测试开关,以便在每次构建时自动执行单元测试。

部署规范

资源使用规范

  1. 容器遵循单一职责原则,不要在单个容器里同时运行多种计算类型的任务,比如把前台 API 服务跟后台异步任务混合在一起。每种计算类型的任务应独立为单独的容器,一是可以避免任务之间相互影响,二是方便 Kubernetes 把不同计算类型的任务调度到对应的节点上并配置不同的升级策略,三是可以针对不同计算类型的任务配置重启策略或优雅终止方式。

  2. Java 8u191 及 Java 9 之后已经可以正确感知到容器资源限制,为了避免 JVM 堆内存设置跟容器限制不同步,请使用 InitialRAMPercentage(初始堆内存大小,默认为 25%)、MinRAMPercentage(容器内存小于 200MB 时的最大堆内存) 和 MaxRAMPercentage(容器内存大于 200MB 时的最大堆内存) 按百分比来设置堆内存用量;

  3. 为不同计算类型的任务设置合理的 Resource Limit,如果不清楚可以先按小的设,后续再按需扩容。推荐前台 API 服务 CPU 1 Memory 2Gi,后台 Job 任务 CPU 500m Memory 1Gi;

容器应用优雅终止规范

  1. Kubernetes 接收到停止 Pod 指令后,会将 Pod 标记为 Terminating 状态,并停止转发流量给该 Pod,然后通知容器运行时停止 Pod 下的各个容器;

  2. 容器运行时发送 TERM 信号(默认为 SIGTERM,可在 Dockerfile 里通过 STOPSIGNAL 指令修改)给容器内的所有进程,Kubernetes 等待该容器停止,等待时间默认为 30s(可通过 terminationGracePeriodSeconds 来修改);

  3. 容器内的应用如果需要优雅终止,那么需处理 SIGTERM 信号,等待现有任务完成、清理资源等;

  4. 如果等待容器停止超时,Kubernetes 将触发强制关闭过程,此时容器运行时会发送 SIGKILL 给容器内的所有进程,系统强制杀掉所有进程,然后容器终止;

  5. Kubernetes 执行其它 Pod 相关的资源清理操作;

参考资料

  • Pod 的生命周期

  • Pod Termination

Spring 应用优雅关闭

  1. 为了避免应用关闭时强行终止当前请求的处理,可设置 server.shutdown=graceful,默认等待时间为 30s,该值可通过 spring.lifecycle.timeout-per-shutdown-phase 来修改;

  2. Spring 默认的 TaskExecutor 会强行终止正在执行的任务,可通过 setWaitForTasksToCompleteOnShutdown 方法来开启等待,为了避免无限等待,可通过 setAwaitTerminationSeconds 方法来设置等待时间;

  3. Spring 应用可以通过实现 ApplicationListener<ContextClosedEvent> 来处理 SIGTERM 信号,比如对于一个后台的 Pulsar Consumer,可以在消息处理间隙去检查应用是否收到了终止信号,如果是则终止消息处理循环;


本文作者:Jagger Wang 来源:CIO之家的朋友们
CIO之家 www.ciozj.com 微信公众号:imciow
    >>频道首页  >>网站首页   纠错  >>投诉
版权声明:CIO之家尊重行业规范,每篇文章都注明有明确的作者和来源;CIO之家的原创文章,请转载时务必注明文章作者和来源;
延伸阅读