ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【学习手册】Apache pulsar操作手册

2022-02-09 14:00:38  阅读:488  来源: 互联网

标签:hz 操作手册 admin -- topic test Apache pulsar


1、Apache pulsar安装部署

1.1、前期准备

  • zookeeper 3.4.5
  • pulsar安装包 2.8.1
  • 集群免密环境

1.2、部署步骤

1.2.1、上传安装包到linux服务器上

下载地址:https://pulsar.apache.org/zh-CN/download/

1.2.2、解压文件到data目录下

tar -zxvf apache-pulsar-2.8.1-bin.tar.gz  -C /data/

1.2.3、初始化集群元数据信息

在risen-cdh01上执行

bin/pulsar initialize-cluster-metadata \
  --cluster pulsar-cluster \
  --zookeeper risen-cdh01:2181  \
  --configuration-store risen-cdh01:2181  \
  --web-service-url http://risen-cdh01:8089 \
  --web-service-url-tls https://risen-cdh01:8443 \
  --broker-service-url pulsar://risen-cdh01:6650 \
  --broker-service-url-tls pulsar+ssl://risen-cdh01:6651

执行成功

10:36:09.876 [main] INFO org.apache.bookkeeper.discover.ZKRegistrationManager - Successfully formatted BookKeeper metadata
10:36:09.880 [main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x16734464b360002 closed
10:36:09.880 [main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x16734464b360002
10:36:10.033 [main] INFO org.apache.pulsar.PulsarClusterMetadataSetup - Cluster metadata for 'pulsar-cluster-1' setup correctly

如果执行失败,进入zkclient中。删除相关文件即可

[zookeeper, counters, bookies, ledgers, managed-ledgers, schemas, namespace, admin, loadbalance]

1.2.4、修改Bookkeeper配置文件

vim conf/bookkeeper.conf

修改如下部分:

zkServers=risen-cdh01:2181,risen-cdh02:2181,risen-cdh03:2181

**ps:**端口修改可以自定义,但是不能与已有的端口冲突

1.2.5、修改brokers配置文件

vim  conf/broker.conf

修改如下部分:

zookeeperServers=risen-cdh01:2181,risen-cdh02:2181,risen-cdh03:2181
configurationStoreServers=risen-cdh01:2181,risen-cdh02:2181,risen-cdh03:2181
clusterName=pulsar-cluster

1.2.6、修改conf目录下所有8080端口

因为8080端口过于常用,很容易被占用

这里进行调整,改为8089即可

1.2.7、将修改后的文件分发到其他几台服务器

scp -r apache-pulsar-2.8.1/ risen-cdh02:$PWD
scp -r apache-pulsar-2.8.1/ risen-cdh03:$PWD

1.2.8、安装BookKeeper集群

分别在三台机器执行

bin/pulsar-daemon start bookie

关闭
bin/pulsar-daemon stop bookie

执行完毕之后使用如下命令看看是否启动成功

bin/bookkeeper shell bookiesanity

1.2.9、安装brokers集群

分别在三台机器执行

bin/pulsar-daemon start broker

关闭
bin/pulsar-daemon stop broker

然后在risen-cdh01上执行

bin/pulsar-admin brokers list pulsar-cluster

不报错则启动成功

2、Pulsar Manager安装部署

2.1、前期准备

  • pulsar集群安装完毕
  • 服务器安装了docker

2.2、安装步骤

2.2.1、docker拉取最新的环境

docker pull apachepulsar/pulsar-manager:latest

2.2.2、运行

docker run -dit \
    -p 9527:9527 -p 7750:7750 \
    -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
    apachepulsar/pulsar-manager:latest

2.2.3、创建账号

CSRF_TOKEN=$(curl http://risen-cdh01:7750/pulsar-manager/csrf-token)
curl \
    -H "X-XSRF-TOKEN: $CSRF_TOKEN" \
    -H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
    -H 'Content-Type: application/json' \
    -X PUT http://risen-cdh01:7750/pulsar-manager/users/superuser \
    -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'

2.2.4、查询有所的cluster

**pulsar-manager调用的pulsar-admin的api,而这个api需要从broker取信息,所以需要给pulsar-admin指定获取信息的broker url。 **

bin/pulsar-admin clusters list

2.2.5、指定cluster

bin/pulsar-admin clusters update pulsar-cluster --url http://192.168.5.213:8089

2.2.6、登录查询

访问http://risen-cdh01:9527

登录刚刚2.2.3设置的账号密码

安装完毕!

3、Pulsar概念简介

3.1、功能和特性

3.1.1、多租户

目的资源隔离,为每位用户配置不同的资源。A用户只能操作20%的资源,B用户操作30%资源(租户与命名空间的操作配合使用)

租户和命名空间是pulsar支持多租户的两个核心概念。
在租户级别,pulsar为特定的租户预留合适的存储空间、应用授权与认证机制
在命名空间级别,pulsar有一系列的配置策略。包括配额、流控、消息过期策略和命名空间之间的隔离策略

3.1.2、灵活的消息系统

  • 队列模型和流模型的统一,在Topic级别只需要保存一份数据,同一份数据可多次消费。以流式、队列等方式计算不同的订阅模型大大提升了灵活度
  • 同时通过事务采用Exactily-Once在进行消息传输的过程中,可以确保数据不丢失、不重复
  • 流模型可以用pulsar function进行进行 从几个topic中流式ETL然后写入到另外的Topic中

3.1.3、云原生架构

  • 计算与存储分离的云原生架构。数据从broker搬离,存在共享存储内部bookkeeper
  • 上层的broker是无状态的,负责数据的分发和服务
  • 下层的是持久化的存储层Bookie。
  • pulsar存储是分片的,避免扩容时受限制,实现数据的独立拓展和快速恢复

3.1.4、segmented Sreams(分片流)

  • 把无界的数据看成分片的流,分片分散存储在分层存储Bookkeeper集群和broker节点上。

3.1.5、支持跨地域复制

  • 实现可以跨cluster、跨地域灾备

3.2、Pulsar提供的组件

3.2.1、层级存储

  • bookkeeper存储。当数据过多之后,读取效率降低。可以将一部分数据放到其他地方(卸载分片)例如hdfs或者其他

3.2.2、Pulsar IO(Connector)连接器

  • 主要目的是 让pulsar和周边的其他软件进行集成。
  • 有两大组件,source、sink
  • 例如HDFS、spark、Flink、Flume、ES、HBase

3.2.3、Pulsar Functions(轻量级计算框架)

  • 为用户提供一个部署简单/API简单/运维简单的FASS平台
  • 进行一些流式的计算工作。
  • 类似于 kafka Stream

3.3、与kafka区别

3.3.1、概念模型

  • Kafka:producer → topic → consumer group → consumer
  • Pulsar:producer → topic → subsciption → consumer

在kafka中有消费者组(consumer group),消费者组中的消费者只能消费topic某个分区中的数据

而在pulsar中是发布订阅模式(sub),可以自己制定策略去进行消费,例如让每个消费者都能消费所有数据

3.3.2、消息消费模式

  • Kafka:主要集中在流(Stream)模式,对单个分区中是独占消费,没有共享(Queue)消费模式
  • Pulsar:提供统一的消费模型和API,可以自由设置是 一对一 还是独占、还是故障切换的方式

3.3.3、消息确认(ack)

  • kafka 使用offset
  • pulsar有专门的cursor管理,确保精准一次消费!

3.3.4、消息保留

  • kakfa:创建topic的时候可以指定数据保留策略,默认7天。到期不管是否消费直接删除,不支持TTL
  • pulsar:所有订阅者都消费了才会删除,不会丢失数据。也可以设置保留期,保留被消费的数据,支持TTL(多长时间内有效)

3.3.5、对比总结

  • pulsar速度比kafka 快得多, 占用的资源更少

3.4、常用名词释义

  • **Messages:**消息是 Pulsar 的基础“单元”。 消息指 producer 发布到 topic的内容,也指 consumer 从 topic 中 consume 的内容(并在消息处理完成后发送确认)。 消息类似于邮政服务系统中的信件。

  • **Producers:**生产者是连接 topic 的程序,它将消息发布到一个 Pulsar broker 上。

  • **发送模式:**同步发送(sync)或者异步(async)

  • **Consumers:**Consumer 向 broker 发送消息流获取申请以获取消息。 在 Consumer 端有一个队列,用于接收从 broker 推送来的消息。 队列大小可以通过receiverQueueSize 进行配置(默认:1000)。 每当 consumer.receive() 被调用一次,就从缓冲区(buffer)获取一条消息。

  • **接收模式:**同步接收(sync)或者异步接收 (async)

  • **监听:**在这个接口中,一旦接受到新的消息,received方法将被调用。

  • 确认:当 consumer 成功消费一条消息后会向 broker 发送一个确认请求(acknowledgement request)。 仅当所有订阅都完成确认后,消息才会被删除,在这之前消息都是被永久保存的。 如果希望消息被 Consumer 确认后仍然保留下来,可配置 消息保留策略实现。

  • **Topic:**Pulsar 中的 topic 是被命名的通道,用做从producer到 consumer传输消息。 Topic的名称为符合良好结构的URL:{persistent|non-persistent}://tenant/namespace/topic

  • **namespace:**命名空间是租户内部逻辑上的命名术语。 一个租户可以通过admin API创建多个命名空间。 例如,包含多个应用程序的租户可以为每个应用程序创建单独的命名空间。 Namespace使得程序可以以层级的方式创建和管理topic Topicmy-tenant/app1 ,它的namespace是app1这个应用,对应的租户是 my-tenant。 你可以在namespace下创建任意数量的topic。

  • **Subscriptions:**订阅是命名好的配置规则,指导消息如何投递给消费者。 Pulsar 中有 4 种可用的订阅模式: 独占(exclusive),共享(shared),灾备(failover)和 键共享(key_shared)。

  • **多主题订阅:**Pulsar消费者可以同时订阅多个topic

4、Pulsar架构

**核心:**计算与存储分离

4.1、单个Pulsar集群构成

  • 多个broker负责处理和负载均衡的producer发出的消息(避免数据倾斜到一个broker),并将这些消息分派给consumer
  • broker和pulsar配置存储江湖来处理相应的任务,并将消息存储在BookKeeper(bookies)实例中
  • broker依赖zookeeper集群处理特定的任务
  • 多个bookie的bookkeeper集群负责消息的持久化存储
  • 一个zookeeper集群,用来处理多个pulsar集群之间的协调任务

4.2、broker

  • 无状态组件,主要负责运行另外两个组件:
  • HTTP服务器,端口默认8080,上边部署为8089端口。暴露了REST系统管理接口,以及在生产者消费者之间进行Topic的查找API
  • 调度分发器,端口6550,一步的TCP服务器,通过二进制协议应用于数据传输

broker会把数据从Managed Ledger缓存中分派到consumer,当积压超过缓存大小的时候,开始将数据给Bookkeeper

4.3、zookeeper

pulsar用zk进行源数据的存储、集群配置和协调

配置存储:存储租户、命名空间和其他需要全局一致的配置项

4.4、bookkeeper

持久化存储容器,是一个分布式的预写(WAL)

特性参考官网文档

4.5、pulsar proxy

为所有的broker提供一个网关,当不能直接连接的时候,可以通过proxy与brokers进行通信

5、Pulsar操作简介

5.1、pulsar-admin操作namespace命令

5.1.1、创建指定租户的namespaces

pulsar-admin namespaces create test-tenant/test-namespace

5.1.2、列出租户下所有namespaces

pulsar-admin namespaces list test-tenant

5.1.3、删除租户下已经存在的namespaces

pulsar-admin namespaces delete test-tenant/ns1

5.1.4、设置积压配额政策

pulsar-admin namespaces set-backlog-quota --limit 10--policy producer_request_hold test-tenant/ns1

5.1.5、查看积压配额策略

pulsar-admin namespaces get-backlog-quotas test-tenant/ns1

5.1.6、移除积压配额策略

pulsar-admin namespaces remove-backlog-quota test-tenant/ns1

5.1.7、设置持久化策略

  • Bookkeeper-ack-quorum:每个 entry 在等待的 acks(有保证的副本)数量,默认值:0
  • Bookkeeper-ensemble:单个 topic 使用的 bookie 数量,默认值:0
  • Bookkeeper-write-quorum:每个 entry 要写入的次数,默认值:0
  • Ml-mark-delete-max-rate:标记-删除操作的限制速率(0表示无限制),默认值:0.0
pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 2--bookkeeper-ensemble 3--bookkeeper-write-quorum 2--ml-mark-delete-max-rate 0 test-tenant/ns1

5.1.8、获取持久化策略

pulsar-admin namespaces get-persistence test-tenant/ns1

5.1.9、卸载命名空间

pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff test-tenant/ns1

5.1.10、清除消息堆积

pulsar-admin namespaces clear-backlog --submy-subscription test-tenant/ns1

5.1.11、设置消息存留参数

命名空间包含多个 topic,每个 topic 的保留大小(存储大小)不应超过特定阈值,否则其存储时间会受到限制。 可通过以下命令配置指定命名空间中 topic 的保留大小和保留时间。

pulsar-admin set-retention --size 10--time 100 test-tenant/ns1

5.1.12、设置消息分发速率

为给定命名空间下所有 topic 设置消息派发速率。 通过每 X 秒消息数(msg-dispatch-rate))或者每 X 秒消息字节数来限制(byte-dispatch-rate)派发速率。 派发速率指每秒派发的消息数,可通过 dispatch-rate-period 来配置。 msg-dispatch-ratebyte-dispatch-rate 的默认值均为 -1,即禁用配额限制。

pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \
--msg-dispatch-rate 1000 \
--byte-dispatch-rate 1048576 \
--dispatch-rate-period 1

5.1.13、获取消息分发速率配置

发送消息数 / 秒

pulsar-admin namespaces get-dispatch-rate test-tenant/ns1

5.2、pulsar-admin操作Tenants(租户)命令

5.2.1、获取资源列表

pulsar-admin tenants list

5.2.2、创建租户

pulsar-admin tenants create 租户名

5.2.3、删除租户

pulsar-admin tenants delete 租户名

5.3、pulsar-admin操作Topic命令

5.3.1、列出指定命名空间下所有持久 topic

pulsar-admin persistent list my-tenant/my-namespace

5.3.2、授权给客户端用户,允许其在指定的 topic 上执行某些操作

pulsar-admin persistent grant-permission \
  --actions produce,consume --role application1 \
  persistent://test-tenant/ns1/tp1 \

5.3.3、获取权限

pulsar-admin persistent permissions \
  persistent://test-tenant/ns1/tp1 \
{
    "application1": [
        "consume",
        "produce"
    ]
}

5.3.4、取消权限

pulsar-admin persistent revoke-permission \
  --role application1 \
  persistent://test-tenant/ns1/tp1 \
{
  "application1": [
    "consume",
    "produce"
  ]
}

5.3.5、删除topic

pulsar-admin persistent delete  persistent://test-tenant/ns1/tp1 

5.3.6、卸载该命名空间下的topic

pulsar-admin persistent unload   persistent://test-tenant/ns1/tp1

5.3.7、查看topic中的10条数据

pulsar-admin persistent peek-messages \
  --count 10 --subscription my-subscription \
  persistent://test-tenant/ns1/tp1

5.3.8、创建topic

**注意:**不管是否有分区,在创建topic后,60s内无操作则会认为该topic是不活动的,会进行删除

相关参数:

Brokerdeleteinactivetopicsenabenabled:默认值为true表示是否启动自动删除功能
BrokerDeleteInactiveTopicsFrequencySeconds:默认60s
  • 创建没有分区的topic
pulsar-admin topics create persistent://my-tenant/my-namespace/mytopic
  • 创建有分区的topic
pulsar-admin topics create-partitioned-copic persistent://my-tenant/my-namespace/mytopic --partitions 5

5.3.9、查询topic被哪个broker代理执行

pulsar-admin topics lookup persistent://my-tenant/my-namespace/mytopic

6、权限操作

权限可以具体操作topic级别,这里用命名空间级别进行测试

6.1、配置

6.1.1、生成秘钥

在pulsar根目录下创建一个key目录用于存放key

bin/pulsar tokens create-secret-key --output key/my-secret.key --base64

6.1.2、创建身份令牌

创建一个super用户的,名字随便起,主要用于之后的管理。

bin/pulsar tokens create --secret-key key/my-secret.key --subject hz-super

执行完拿到一个super用户的拿到一个token

eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJoei1zdXBlciJ9.1ePk26E5XllSBKrSUkyR7XGnwJ1C1G1ceI9XdriNwXE

6.1.3、配置broker.conf

authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
tokenSecretKey=/opt/apache-pulsar-2.8.1/key/my-secret.key
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJoei1zdXBlciJ9.1ePk26E5XllSBKrSUkyR7XGnwJ1C1G1ceI9XdriNwXE
superUserRoles=hz-super

6.1.4、配置client.conf

authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJoei1zdXBlciJ9.1ePk26E5XllSBKrSUkyR7XGnwJ1C1G1ceI9XdriNwXE

6.1.5、重启broker并验证

bin/pulsar-daemon stop broker
bin/pulsar-daemon start broker
bin/pulsar-admin tenants list

已经正常

"hz-test-tenants"
"public"
"pulsar"

6.2、java测试

6.2.1、生成新用户的token

bin/pulsar tokens create --secret-key key/my-secret.key  --subject test

拿到token

eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.ZLU3PsOlh3H282_n34i7Wm8qyb-VMckKsfY-QU9rUIQ

6.2.2、赋权

bin/pulsar-admin namespaces grant-permission hz-test-tenants/hz-ns02 --role test  --actions produce,consume

6.2.3、构建java客户端代码

PulsarClient client = PulsarClient.builder()
        .serviceUrl(SERVER_URL)
        .enableTcpNoDelay(true)
        .build();

直接执行报错

Exception in thread "main" org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Unable to authenticate
	at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:965)
	at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:97)
	at ConsumerDemo.main(ConsumerDemo.java:27)

6.2.4、增加token配置

.authentication(AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.ZLU3PsOlh3H282_n34i7Wm8qyb-VMckKsfY-QU9rUIQ"))

正常执行

6.3、java赋权操作

6.3.1、创建PulsarAdmin

需要用super账户的token来进行验证,否则会报权限不足的错误

PulsarAdmin admin = PulsarAdmin.builder()
        .serviceHttpUrl(SERVER_HTTP_URL)
        .authentication(AuthenticationFactory.token(TOKEN))
        .build();

6.3.2、查询命名空间namespace的用户权限

System.out.println(admin.namespaces().getPermissions("hz-test-tenants/hz-ns02").toString());

结果:

{hz-super=[consume, produce], hz-test-tenants=[consume, produce]}

6.3.3、授权命名空间的访问权限

Set<AuthAction> action  = new HashSet<AuthAction>();
action.add(AuthAction.produce);
admin.namespaces().grantPermissionOnNamespace("hz-test-tenants/hz-ns02", "hz-produce", action);
System.out.println(admin.namespaces().getPermissions("hz-test-tenants/hz-ns02").toString());

结果:

{hz-test-tenants=[consume, produce], hz-super=[consume, produce], hz-produce=[produce]}

6.3.4、移除命名空间的权限

admin.namespaces().revokePermissionsOnNamespace("hz-test-tenants/hz-ns02", "hz-produce");
System.out.println(admin.namespaces().getPermissions("hz-test-tenants/hz-ns02").toString());

结果:

{hz-test-tenants=[consume, produce], hz-super=[consume, produce]}

6.3.5、查询topic的操作权限

System.out.println(admin.topics().getPermissions("persistent://hz-test-tenants/hz-ns02/hz-topic2"));

结果:

{hz-super=[consume, produce], hz-test-tenants=[consume, produce]}

6.3.6、授予用户操作topic的权限

Set<AuthAction> action1 = new HashSet<AuthAction>();
action1.add(AuthAction.consume);
admin.topics().grantPermission("persistent://hz-test-tenants/hz-ns02/hz-topic2","hz-test-topic", action1);
System.out.println(admin.topics().getPermissions("persistent://hz-test-tenants/hz-ns02/hz-topic2"));

结果:

{hz-super=[consume, produce], hz-test-topic=[consume], hz-test-tenants=[consume, produce]}

6.3.7、移除用户的权限

admin.topics().revokePermissions("persistent://hz-test-tenants/hz-ns02/hz-topic2","hz-test-topic") ;
System.out.println(admin.topics().getPermissions("persistent://hz-test-tenants/hz-ns02/hz-topic2"));

结果:

{hz-super=[consume, produce], hz-test-tenants=[consume, produce]}

7、消息保留和过期策略

7.1、Retention

  • 用户可以将 Consumer 已经确认的消息保留下来
  • 默认值:defaultRetentionTimeInMinutes=0
  • 配置可以在broker中也可以通过命令行:
$ pulsar-admin namespaces get-retention [your tenant]/[your-namespace]
{
  "retentionTimeInMinutes": 10,
  "retentionSizeInMB": 0
}

java

int retentionTime = 10; // 10 minutes
int retentionSize = 500; // 500 megabytes
RetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize);
admin.namespaces().setRetention("hz-test-tenants/hz-ns02", policies );

7.2、TTL(Time To Live) 策略

  • 对于未确认的消息,用户可以通过设置 TTL 来使未确认的消息到达已经确认的状态
  • 默认情况下,Pulsar 会持久化所有未被确认的消息
  • 如果未被确认的消息有很多,这种策略会造成大量的消息被积压
  • 通过设置 TTL 使得未被确认的消息进入到被确认的状态,当超过设定的 TTL 时间之后,配合相应的 Retention 策略将消息丢弃
pulsar-admin namespaces get-message-ttl [your tenant]/[your namespace] 60

java

admin.namespaces().setNamespaceMessageTTL("hz-test-tenants/hz-ns02",60);

8、Connect IO连接器

以mysql binlog为例

8.1、下载连接器

在下载页面找到对应的连接器进行下

https://pulsar.apache.org/zh-CN/download/

8.2、创建文件夹connecters

用于放这些连接器

8.3、创建连接器配置文件

tenant: "public"
namespace: "default"
name: "debezium-mysql-source"
topicName: "debezium-mysql-topic"
archive: "connectors/pulsar-io-debezium-mysql-2.8.1.nar"
parallelism: 1

configs:
    database.hostname: "risen-cdh01"
    database.port: "3306"
    database.user: "cdh"
    database.password: "123456"
    database.server.id: "184054"
    database.server.name: "dbserver1"
    database.whitelist: "test"
    database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
    database.history.pulsar.topic: "history-topic"
    database.history.pulsar.service.url: "pulsar://risen-cdh01:6650"
    key.converter: "org.apache.kafka.connect.json.JsonConverter"
	pulsar.service.url: "pulsar://risen-cdh01:6650"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    offset.storage.topic: "offset-topic"

8.4、创建连接器

bin/pulsar-admin source  create --source-config-file debeziumConf/mysql.yaml 

8.5、验证是否成功

bin/pulsar-admin persistent list public/default
"persistent://public/default/dbserver1.test.test01"
"persistent://public/default/dbserver1"
"persistent://public/default/dbserver1.test.mysql_func2"

生成了对应的topic

8.6、模拟消费者进行订阅

bin/pulsar-client consume -s "sub-products" public/default/dbserver1.test.mysql_func2 -n 0

arDatabaseHistory"
database.history.pulsar.topic: “history-topic”
database.history.pulsar.service.url: “pulsar://risen-cdh01:6650”
key.converter: “org.apache.kafka.connect.json.JsonConverter”
pulsar.service.url: “pulsar://risen-cdh01:6650”
value.converter: “org.apache.kafka.connect.json.JsonConverter”
offset.storage.topic: “offset-topic”


### 8.4、创建连接器

bin/pulsar-admin source create --source-config-file debeziumConf/mysql.yaml


### 8.5、验证是否成功

bin/pulsar-admin persistent list public/default


“persistent://public/default/dbserver1.test.test01”
“persistent://public/default/dbserver1”
“persistent://public/default/dbserver1.test.mysql_func2”


生成了对应的topic

### 8.6、模拟消费者进行订阅

bin/pulsar-client consume -s “sub-products” public/default/dbserver1.test.mysql_func2 -n 0




[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EvZG76x7-1644385852303)(C:\Users\ADMINI~1\AppData\Local\Temp\1640938418974.png)]

标签:hz,操作手册,admin,--,topic,test,Apache,pulsar
来源: https://blog.csdn.net/qq237408305/article/details/122840408

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有