当前位置: 首页 > news >正文

拓者设计吧邀请码seo优化多少钱

拓者设计吧邀请码,seo优化多少钱,免费自助建站网站建设免费信息发布,贵州省建设厅官网网站引言 kafka-connect-jdbc 是一个 Kafka Connector,用于在任何兼容 JDBC 的数据库和 Kafka 之间加载数据。它支持从数据库中读取数据并将其写入 Kafka 主题,也可以将 Kafka 主题中的数据写入数据库。 核心类与组件 1. JdbcSourceConnector 功能&#…

引言

kafka-connect-jdbc 是一个 Kafka Connector,用于在任何兼容 JDBC 的数据库和 Kafka 之间加载数据。它支持从数据库中读取数据并将其写入 Kafka 主题,也可以将 Kafka 主题中的数据写入数据库。

核心类与组件

1. JdbcSourceConnector
  • 功能:这是 Kafka Connect 的源连接器实现类,负责监控 JDBC 数据库并生成任务以摄取数据库内容。
  • 关键代码分析
public class JdbcSourceConnector extends SourceConnector {// ... 省略部分代码@Overridepublic void start(Map<String, String> properties) throws ConnectException {log.info("Starting JDBC Source Connector");try {configProperties = properties;config = new JdbcSourceConnectorConfig(configProperties);} catch (ConfigException e) {throw new ConnectException("Couldn't start JdbcSourceConnector due to configuration error", e);}// 获取数据库连接final String dbUrl = config.getString(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG);final int maxConnectionAttempts = config.getInt(JdbcSourceConnectorConfig.CONNECTION_ATTEMPTS_CONFIG);final long connectionRetryBackoff = config.getLong(JdbcSourceConnectorConfig.CONNECTION_BACKOFF_CONFIG);dialect = DatabaseDialects.findBestFor(dbUrl, config);cachedConnectionProvider = connectionProvider(maxConnectionAttempts, connectionRetryBackoff);cachedConnectionProvider.getConnection();// 启动表监控线程long tablePollMs = config.getLong(JdbcSourceConnectorConfig.TABLE_POLL_INTERVAL_MS_CONFIG);long tableStartupLimitMs = config.getLong(JdbcSourceConnectorConfig.TABLE_MONITORING_STARTUP_POLLING_LIMIT_MS_CONFIG);List<String> whitelist = config.getList(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG);Set<String> whitelistSet = whitelist.isEmpty() ? null : new HashSet<>(whitelist);List<String> blacklist = config.getList(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG);Set<String> blacklistSet = blacklist.isEmpty() ? null : new HashSet<>(blacklist);tableMonitorThread = new TableMonitorThread(dialect,cachedConnectionProvider,context,tableStartupLimitMs,tablePollMs,whitelistSet,blacklistSet,Time.SYSTEM);if (config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG).isEmpty()) {tableMonitorThread.start();log.info("Starting Table Monitor Thread");}}@Overridepublic Class<? extends Task> taskClass() {return JdbcSourceTask.class;}@Overridepublic List<Map<String, String>> taskConfigs(int maxTasks) {String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);List<Map<String, String>> taskConfigs;if (!query.isEmpty()) {// 自定义查询Map<String, String> taskProps = new HashMap<>(configProperties);taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, "");taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true");taskConfigs = Collections.singletonList(taskProps);} else {// 表模式List<TableId> currentTables = tableMonitorThread.tables();if (currentTables == null || currentTables.isEmpty()) {// 没有找到表taskConfigs = new ArrayList<>(1);Map<String, String> taskProps = new HashMap<>(configProperties);taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, "");taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, currentTables == null ? "false" : "true");taskConfigs.add(taskProps);} else {// 分配表到任务int numGroups = Math.min(currentTables.size(), maxTasks);List<List<TableId>> tablesGrouped = ConnectorUtils.groupPartitions(currentTables, numGroups);taskConfigs = new ArrayList<>(tablesGrouped.size());for (List<TableId> taskTables : tablesGrouped) {Map<String, String> taskProps = new HashMap<>(configProperties);ExpressionBuilder builder = dialect.expressionBuilder();builder.appendList().delimitedBy(",").of(taskTables);taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, builder.toString());taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true");taskConfigs.add(taskProps);}}}return taskConfigs;}@Overridepublic void stop() throws ConnectException {log.info("Stopping table monitoring thread");tableMonitorThread.shutdown();try {tableMonitorThread.join(MAX_TIMEOUT);} catch (InterruptedException e) {// Ignore, shouldn't be interrupted} finally {cachedConnectionProvider.close(true);if (dialect != null) {dialect.close();}}}
}
  • 详细解释
    • start 方法:初始化连接器配置,获取数据库连接,启动表监控线程(如果没有自定义查询)。
    • taskClass 方法:返回任务类 JdbcSourceTask
    • taskConfigs 方法:根据配置生成任务配置。如果有自定义查询,生成一个任务配置;否则,根据表的数量和最大任务数分配表到任务。
    • stop 方法:停止表监控线程,关闭数据库连接和方言。
2. JdbcSourceTask
  • 功能:Kafka Connect 的源任务实现类,负责从 JDBC 数据库中读取数据并生成 Kafka Connect 记录。
  • 关键代码分析
public class JdbcSourceTask extends SourceTask {// ... 省略部分代码@Overridepublic void start(Map<String, String> properties) {log.info("Starting JDBC source task");try {config = new JdbcSourceTaskConfig(properties);} catch (ConfigException e) {throw new ConfigException("Couldn't start JdbcSourceTask due to configuration error", e);}List<String> tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG);Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED);String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG);if ((tables.isEmpty() && query.isEmpty())) {if (!tablesFetched) {// 等待表信息获取taskThreadId.set(Thread.currentThread().getId());log.info("Started JDBC source task. Waiting for DB tables to be fetched.");return;}// 没有分配表或查询throw new ConfigException("Task is being killed because it was not assigned a table nor a query to execute.");}if ((!tables.isEmpty() && !query.isEmpty())) {// 表和查询不能同时分配throw new ConfigException("Invalid configuration: a JdbcSourceTask cannot have both a table and a query assigned to it");}// 获取数据库连接final String url = config.getString(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG);final int maxConnAttempts = config.getInt(JdbcSourceConnectorConfig.CONNECTION_ATTEMPTS_CONFIG);final long retryBackoff = config.getLong(JdbcSourceConnectorConfig.CONNECTION_BACKOFF_CONFIG);final String dialectName = config.getString(JdbcSourceConnectorConfig.DIALECT_NAME_CONFIG);if (dialectName != null && !dialectName.trim().isEmpty()) {dialect = DatabaseDialects.create(dialectName, config);} else {dialect = DatabaseDialects.findBestFor(url, config);}cachedConnectionProvider = connectionProvider(maxConnAttempts, retryBackoff);// 设置事务隔离级别dialect.setConnectionIsolationMode(cachedConnectionProvider.getConnection(),TransactionIsolationMode.valueOf(config.getString(JdbcSourceConnectorConfig.TRANSACTION_ISOLATION_MODE_CONFIG)));// 确定查询模式TableQuerier.QueryMode queryMode = !query.isEmpty() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE;List<String> tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY ? Collections.singletonList(query) : tables;// 获取偏移量String mode = config.getString(JdbcSourceTaskConfig.MODE_CONFIG);Map<String, List<Map<String, String>>> partitionsByTableFqn = new HashMap<>();Map<Map<String, String>, Map<String, Object>> offsets = null;if (mode.equals(JdbcSourceTaskConfig.MODE_INCREMENTING)|| mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP)|| mode.equals(JdbcSourceTaskConfig.MODE_TIMESTAMP_INCREMENTING)) {List<Map<String, String>> partitions = new ArrayList<>(tables.size());switch (queryMode) {case TABLE:for (String table : tables) {List<Map<String, String>> tablePartitions = possibleTablePartitions(table);partitions.addAll(tablePartitions);partitionsByTableFqn.put(table, tablePartitions);}break;case QUERY:partitions.add(Collections.singletonMap(JdbcSourceConnectorConstants.QUERY_NAME_KEY, JdbcSourceConnectorConstants.QUERY_NAME_VALUE));break;default:throw new ConfigException("Unknown query mode: " + queryMode);}offsets = context.offsetStorageReader().offsets(partitions);}// 创建表查询器for (String tableOrQuery : tablesOrQuery) {final List<Map<String, String>> tablePartitionsToCheck;final Map<String, String> partition;switch (queryMode) {case TABLE:tablePartitionsToCheck = partitionsByTableFqn.get(tableOrQuery);break;case QUERY:partition = Collections.singletonMap(JdbcSourceConnectorConstants.QUERY_NAME_KEY, JdbcSourceConnectorConstants.QUERY_NAME_VALUE);tablePartitionsToCheck = Collections.singletonList(partition);break;default:throw new ConfigException("Unexpected query mode: " + queryMode);}Map<String, Object> offset = null;if (offsets != null) {for (Map<String, String> toCheckPartition : tablePartitionsToCheck) {offset = offsets.get(toCheckPartition);if (offset != null) {break;}}}TableQuerier querier = new TableQuerier(queryMode,tableOrQuery,config.getString(JdbcSourceTaskConfig.MODE_CONFIG),config.getString(JdbcSourceTaskConfig.INCREMENTING_COLUMN_NAME_CONFIG),config.getList(JdbcSourceTaskConfig.TIMESTAMP_COLUMN_NAME_CONFIG),config.getLong(JdbcSourceTaskConfig.TIMESTAMP_DELAY_INTERVAL_MS_CONFIG),config.getBoolean(JdbcSourceTaskConfig.VALIDATE_NON_NULL_CONFIG),config.timeZone(),config.getString(JdbcSourceTaskConfig.QUERY_SUFFIX_CONFIG).trim(),offset);tableQueue.add(querier);}}@Overridepublic List<SourceRecord> poll() throws InterruptedException {List<SourceRecord> records = new ArrayList<>();int consecutiveEmptyResults = 0;while (running.get() && records.isEmpty() && consecutiveEmptyResults < CONSECUTIVE_EMPTY_RESULTS_BEFORE_RETURN) {TableQuerier querier = tableQueue.poll();if (querier == null) {consecutiveEmptyResults++;time.sleep(100);continue;}try {List<SourceRecord> querierRecords = querier.poll(cachedConnectionProvider.getConnection(), time);records.addAll(querierRecords);if (querierRecords.isEmpty()) {consecutiveEmptyResults++;} else {consecutiveEmptyResults = 0;}} catch (SQLException e) {log.error("Error polling for table {}: {}", querier.getTableOrQuery(), e.getMessage());maxRetriesPerQuerier--;if (maxRetriesPerQuerier > 0) {time.sleep(1000);tableQueue.add(querier);} else {log.error("Max retries exceeded for table {}", querier.getTableOrQuery());}} finally {if (running.get()) {tableQueue.add(querier);}}}return records;}@Overridepublic void stop() {running.set(false);cachedConnectionProvider.close(true);if (dialect != null) {dialect.close();}}
}
  • 详细解释
    • start 方法:初始化任务配置,检查配置的有效性,获取数据库连接,设置事务隔离级别,确定查询模式,获取偏移量,创建表查询器。
    • poll 方法:从表查询器队列中取出查询器,执行查询,将查询结果转换为 SourceRecord 列表返回。如果查询失败,进行重试。
    • stop 方法:停止任务,关闭数据库连接和方言。
3. JdbcSourceConnectorConfig
  • 功能:配置类,用于管理 JdbcSourceConnector 的配置信息。
  • 关键代码分析
public class JdbcSourceConnectorConfig extends AbstractConfig {// ... 省略部分代码public static final String CONNECTION_URL_CONFIG = CONNECTION_PREFIX + "url";public static final String CONNECTION_USER_CONFIG = CONNECTION_PREFIX + "user";public static final String CONNECTION_PASSWORD_CONFIG = CONNECTION_PREFIX + "password";public static final String CONNECTION_ATTEMPTS_CONFIG = CONNECTION_PREFIX + "attempts";public static final String CONNECTION_BACKOFF_CONFIG = CONNECTION_PREFIX + "backoff.ms";public static final String POLL_INTERVAL_MS_CONFIG = "poll.interval.ms";public static final String BATCH_MAX_ROWS_CONFIG = "batch.max.rows";public static final String NUMERIC_PRECISION_MAPPING_CONFIG = "numeric.precision.mapping";public static final String NUMERIC_MAPPING_CONFIG = "numeric.mapping";public static final String DIALECT_NAME_CONFIG = "dialect.name";public static final String MODE_CONFIG = "mode";public static final String INCREMENTING_COLUMN_NAME_CONFIG = "incrementing.column.name";public static final String TIMESTAMP_COLUMN_NAME_CONFIG = "timestamp.column.name";public static final String TIMESTAMP_DELAY_INTERVAL_MS_CONFIG = "timestamp.delay.interval.ms";public static final String VALIDATE_NON_NULL_CONFIG = "validate.non.null";public static final String QUERY_CONFIG = "query";public static final String TABLE_WHITELIST_CONFIG = "table.whitelist";public static final String TABLE_BLACKLIST_CONFIG = "table.blacklist";public static final String TABLE_POLL_INTERVAL_MS_CONFIG = "table.poll.interval.ms";public static final String TABLE_MONITORING_STARTUP_POLLING_LIMIT_MS_CONFIG = "table.monitoring.startup.polling.limit.ms";public static final String TOPIC_PREFIX_CONFIG = "topic.prefix";public static final String TRANSACTION_ISOLATION_MODE_CONFIG = "transaction.isolation.mode";public JdbcSourceConnectorConfig(Map<?, ?> originals) {super(configDef(), originals);}public static ConfigDef configDef() {ConfigDef configDef = new ConfigDef();configDef.define(CONNECTION_URL_CONFIG,Type.STRING,CONNECTION_URL_DEFAULT,Importance.HIGH,CONNECTION_URL_DOC,null,-1,Width.LONG,CONNECTION_URL_DISPLAY);configDef.define(CONNECTION_USER_CONFIG,Type.STRING,null,Importance.MEDIUM,CONNECTION_USER_DOC,null,-1,Width.MEDIUM,CONNECTION_USER_DISPLAY);// ... 省略其他配置项的定义return configDef;}
}
  • 详细解释
    • 定义了一系列配置项,如数据库连接 URL、用户名、密码、轮询间隔、批量最大行数等。
    • configDef 方法用于定义配置项的元信息,包括类型、默认值、重要性、文档等。

工作流程

  1. 启动连接器JdbcSourceConnectorstart 方法被调用,初始化配置,获取数据库连接,启动表监控线程。
  2. 生成任务配置JdbcSourceConnectortaskConfigs 方法根据配置生成任务配置,将表分配到不同的任务中。
  3. 启动任务JdbcSourceTaskstart 方法被调用,初始化任务配置,获取数据库连接,设置事务隔离级别,创建表查询器。
  4. 轮询数据JdbcSourceTaskpoll 方法被周期性调用,从表查询器队列中取出查询器,执行查询,将查询结果转换为 SourceRecord 列表返回。
  5. 停止任务和连接器JdbcSourceTaskstop 方法和 JdbcSourceConnectorstop 方法被调用,关闭数据库连接和方言。

总结

kafka-connect-jdbc 通过 JdbcSourceConnectorJdbcSourceTask 实现了从 JDBC 数据库到 Kafka 的数据摄取。JdbcSourceConnector 负责管理连接器的生命周期和任务分配,JdbcSourceTask 负责实际的数据读取和转换。JdbcSourceConnectorConfig 用于管理连接器的配置信息。通过深入理解这些核心类和组件的工作原理,可以更好地使用和扩展 kafka-connect-jdbc

http://www.cadmedia.cn/news/4986.html

相关文章:

  • 英文都不懂 学网站建设维护难吗汕头网站建设平台
  • 新疆建设网个人云seo快速排名优化
  • 深圳最专业的高端网站建设重庆网站制作
  • 东西湖区建设局网站竞价账户托管公司哪家好
  • 海拉尔网页设计十大seo公司
  • 苏州商城网站建设国际最新新闻热点事件
  • 软件下载官网源码外贸网站谷歌seo
  • 广州棋牌软件开发公司seo培训中心
  • 益阳有专做网站的吗无锡谷歌推广
  • 建设项目环保验收平台网站新网站怎么做推广
  • 网上商城网站开发公司适合35岁女人的培训班
  • 招聘网站建设的项目描述搜狗推广平台
  • 网站建设属于哪个经营范围app推广一手单
  • 前程无忧网广州网站建设分类岗位网站建设企业建站
  • 成人宁波专业seo服务
  • 磨床 东莞网站建设seo网站推广服务
  • 南宁大型网站开发百度推广怎么弄
  • 关于网站图片广西网站seo
  • 建设网上银行官方网站微博营销策略
  • 婚庆公司一条龙包括哪些批量优化网站软件
  • 福州高端网站建设公司山东公司网站推广优化
  • 万网网站建设的子分类能显示多少个全国教育培训机构平台
  • 网络购物网站大全怎么去做网络推广
  • 哈尔滨服务好的建站焦作网站seo
  • 深圳开发公司网站怎么自己做网址
  • 网站建设重点是什么网络营销相关的岗位有哪些
  • 广西建设教育学会网站seo课程多少钱
  • 软件开发平台简介系统优化软件有哪些
  • 网站建设流程及费用google浏览器下载安装
  • 罗湖商城网站建设多少钱seo推广方案怎么做