springboot-starter如何整合阿里云datahub呢?

书欣 SpringBoot 发布时间:2022-09-01 18:00:59 阅读数:5609 1
下文笔者讲述springboot整合datahub的方法分享,如下所示

Datahub简介说明

DataHub的功能:
    1.与大数据解决方案中Kafka具有相同的角色
      同时还提供数据队列功能
    2.DataHub还可与阿里云其它上下游产品对接
	  其一个交换的功能,称之为数据交换

DataHub 简介

datahub对外提供开发者生产和消费的sdk
 在springboot中,我们也可用使用自定义starter的方式加载sdk
实现思路:
    1.引入相应的starter器
	2.application.yml中加入相应的配置信息
	3.编写相应的代码

引入相应的starter器

<dependency>
      <artifactId>cry-starters-projects</artifactId>
      <groupId>cn.com.cry.starters</groupId>
      <version>2022-1.0.0</version>
</dependency>

启动客户端

配置阿里云DataHub的endpoint以及AK信息
aliyun:
  datahub:
  	# 开启功能
  	havingValue: true
    #是否为私有云
    isPrivate: false
    accessId: xxx
    accessKey: xxx
    endpoint: xxx
    #连接DataHub客户端超时时间
    conn-timeout: 10000

获取DataHub客户端

DatahubClient datahubClient=DataHubTemplate.getDataHubClient();

写数据

public int write(@RequestParam("id") Integer shardId) {
    list<Student> datas = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        Student s = new Student();
        s.setAge(i);
        s.setName("name-" + i);
        s.setAddress("address-" + i);
        datas.add(s);
    }
    int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);
    return successNumbers;
}
 上述代码说明:
      projectName为my_test
      topicName为student
      shardId 为N的hub里写数据
      且返回插入成功的条数

读数据

读数据开发的逻辑类似RabbitMq的starter
使用@DataHubListener和@DataHubHandler处理器注解进行使用
@Component
@DataHubListener(projectName = "my_test")
public class ReadServiceImpl {

    @DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)
    public void handler(Message message) {
        System.out.println("读取到shardId=0的消息");
        System.out.println(message.getData());
        System.out.println(message.getCreateTsime());
        System.out.println(message.getSize());
        System.out.println(message.getConfig());
        System.out.println(message.getMessageId());
    }
}
 
以上代码
    通过LATEST游标的方式
    监听 project=my_test
         topicName=student
         shardId=0
     最终使用Message的包装类
	 获取dataHub实时写入的数据 
此处可设置多种游标类型
  例:根据最新的系统时间、最早录入的序号等

核心代码

需一个DataHubClient增强类
在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId
根据游标规则来读取当前的cursor进行数据的读取
public class DataHubClientWrapper implements InitializingBean, DisposableBean {

    @Autowired
    private AliyunAccountProperties properties;

    @Autowired
    private ApplicationContext context;

    private DatahubClient datahubClient;


    public DataHubClientWrapper() {

    }

    /**
     * 执行销毁方法
     *
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        WorkerResourceExecutor.shutdown();
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        /**
         * 创建DataHubClient
         */
        this.datahubClient = DataHubClientFactory.create(properties);

        /**
         * 打印Banner
         */
        BannerUtil.printBanner();

        /**
         * 赋值Template的静态对象dataHubClient
         */
        DataHubTemplate.setDataHubClient(datahubClient);

        /**
         * 初始化Worker线程
         */
        WorkerResourceExecutor.initWorkerResource(context);
        /**
         * 启动Worker线程
         */
        WorkerResourceExecutor.start();
    }
}
 
//写数据
//构建了一个类似RedisDataTemplate的模板类
//封装了write的逻辑
//调用时只需要用DataHubTemplate.write调用

public class DataHubTemplate {
    private static DatahubClient dataHubClient;
    private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class);

    /**
     * 默认不开启重试机制
     *
     * @param projectName
     * @param topicName
     * @param datas
     * @param shardId
     * @return
     */
    public static int write(String projectName, String topicName, List<?> datas, Integer shardId) {
        return write(projectName, topicName, datas, shardId, false);
    }

    /**
     * 往指定的projectName以及topic和shard下面写数据
     *
     * @param projectName
     * @param topicName
     * @param datas
     * @param shardId
     * @param retry
     * @return
     */
    private static int write(String projectName, String topicName, List<?> datas, Integer shardId, boolean retry) {
        RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (Object o : datas) {
            RecordEntry entry = new RecordEntry();
            Map<String, Object> data = BeanUtil.beanToMap(o);
            TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);
            for (String key : data.keySet()) {
                tupleRecordData.setField(key, data.get(key));
            }
            entry.setRecordData(tupleRecordData);
            entry.setShardId(String.valueOf(shardId));
            recordEntries.add(entry);
        }
        PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);
        int failedRecordCount = result.getFailedRecordCount();
        if (failedRecordCount > 0 && retry) {
            retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);
        }
        return datas.size() - failedRecordCount;
    }

    /**
     * @param client
     * @param records
     * @param retryTimes
     * @param project
     * @param topic
     */
    private static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
        boolean suc = false;
        List<RecordEntry> failedRecords = records;
        while (retryTimes != 0) {
            logger.info("the time to send message has [{}] records failed, is starting retry", records.size());
            retryTimes = retryTimes - 1;
            PutRecordsResult result = client.putRecords(project, topic, failedRecords);
            int failedNum = result.getFailedRecordCount();
            if (failedNum > 0) {
                failedRecords = result.getFailedRecords();
                continue;
            }
            suc = true;
            break;
        }
        if (!suc) {
            logger.error("DataHub send message retry failure");
        }
    }

    public static DatahubClient getDataHubClient() {
        return dataHubClient;
    }

    public static void setDataHubClient(DatahubClient dataHubClient) {
        DataHubTemplate.dataHubClient = dataHubClient;
    }
}

//读数据
//需要在Spring启动时开启一个监听线程DataListenerWorkerThread
//执行一个死循环不停轮询DataHub下的对应通道

public class DataListenerWorkerThread extends Thread {
    private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);
    private volatile boolean init = false;
    private DatahubConfig config;
    private String workerKey;
    private int recordLimits;
    private int sleep;
    private RecordSchema recordSchema;
    private RecordHandler recordHandler;
    private CursorHandler cursorHandler;

    public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringredisTemplate redisTemplate) {
        this.config = new DatahubConfig(projectName, topicName, shardId);
        this.workerKey = projectName + "-" + topicName + "-" + shardId;
        this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);
        this.recordLimits = recordLimits;
        this.sleep = sleep;
        this.setName("DataHub-Worker");
        this.setDaemon(true);
    }

    @Override
    public void run() {
        initRecordSchema();
        String cursor = cursorHandler.positioningCursor(config);
        for (; ; ) {
            try {
                GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);
                if (result.getRecordCount() <= 0) {
                    // 无数据,sleep后读取
                    Thread.sleep(sleep);
                    continue;
                }
                List<Map<String, Object>> dataMap = recordHandler.convert2List(result.getRecords());
                logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId());
                // 拿到下一个游标
                cursor = cursorHandler.nextCursor(result);
                //执行方法
                WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor);
            } catch (InvalidParameterException ex) {
                //非法游标或游标已过期,建议重新定位后开始消费
                cursor = cursorHandler.resetCursor(config);
                logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage());
            } catch (DatahubClientException e) {
                logger.error("DataHubException:{}", e.getErrorMessage());
                this.interrupt();
            } catch (InterruptedException e) {
                logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId());
            } catch (Exception e) {
                this.interrupt();
                logger.error("receive DataHub records cry.exception:{}", e, e);
            }
        }
    }

    /**
     * 终止
     */
    public void shutdown() {
        if (!interrupted()) {
            interrupt();
        }
    }

    /**
     * 初始化topic字段以及recordSchema
     */
    private void initRecordSchema() {
        try {
            if (!init) {
                recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema();
                List<Field> fields = recordSchema.getFields();
                this.recordHandler = new RecordHandler(fields);
                init = true;
            }
        } catch (Exception e) {
            logger.error("initRecordSchema error:{}", e, e);
        }
    }
}

//read的时候结合了注解开发
//通过定义类注解DataHubListener和方法注解DataHubHandler内置属性
//来动态的控制需要在哪些方法中处理监听到的数据的逻辑:

DataHubHandler

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.Runtime)
@Documented
public @interface DataHubHandler {
    /**
     * 话题名称
     *
     * @return
     */
    String topicName();

    /**
     * shardId
     *
     * @return
     */
    int shardId();

    /**
     * 最大数据量限制
     *
     * @return
     */
    int recordLimit() default 1000;

    /**
     * 游标类型
     *
     * @return
     */
    CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST;

    /**
     * 若未监听到数据添加,休眠时间 ms
     *
     * @return
     */
    int sleep() default 10000;

    /**
     * 使用CursorType.SYSTEM_TIME的时候配置 时间偏移量
     *
     * @return
     */
    String startTime() default "";

    /**
     * 使用使用CursorType.SEQUENCE的时候配置,偏移量,必须是正整数
     *
     * @return
     */
    int sequenceOffset() default 0;
}

 
DataHubListener

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubListener {
    String projectName();
}
 
//启动SpringBootStarter的EnableConfigurationProperties注解
//使用配置文件来控制default-bean的开启或关闭

启动类
@Configuration
@EnableConfigurationProperties(value = {AliyunAccountProperties.class})
public class DataHubClientAutoConfiguration {
    /**
     * 初始化dataHub装饰bean
     *
     * @return
     */
    @Bean
    public DataHubClientWrapper dataHubWrapper() {
        return new DataHubClientWrapper();
    }

}
 
//属性配置类
@ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true")
@Data
public class AliyunAccountProperties implements Properties{

    /**
     * http://xxx.aliyuncs.com
     */
    private String endpoint;

    /**
     * account
     */
    private String accessId;

    /**
     * password
     */
    private String accessKey;

    /**
     * private cloud || public cloud
     */
    private boolean isPrivate;

    /**
     * unit: ms
     */
    private Integer connTimeout = 10000;
} 

最后记得要做成一个starter
在resources下新建一个META-INF文件夹
新建一个spring.factories文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration= \
  cry.starter.datahub.DataHubClientAutoConfiguration
 
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

本文链接: https://www.Java265.com/JavaFramework/SpringBoot/202209/4294.html

最近发表

热门文章

好文推荐

Java265.com

https://www.java265.com

站长统计|粤ICP备14097017号-3

Powered By Java265.com信息维护小组

使用手机扫描二维码

关注我们看更多资讯

java爱好者