一、部署单机版kafka
kafka部署之前先要部署zookeeper。编写部署zookeeper的YAML文件如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| apiVersion: apps/v1
kind: Deployment
metadata:
name: zookeeper-deployment
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: 192.168.1.192:5000/zookeeper:latest
ports:
- containerPort: 2181
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper-service
spec:
selector:
app: zookeeper
ports:
- protocol: TCP
port: 2181
targetPort: 2181
nodePort: 32181 # NodePort设置为32181
type: NodePort
|
这里的镜像源我选择本地自建仓库。关于本地自建仓库部署方式见下一篇文章。
部署的大致过程是先使用deploy创建pod,容器内部应用端口为2181.再使用service将容器内部端口暴露出来,使得外部可以访问。这里使用nodeport方暴露应用端口。
32181(容器外):2181(容器内)。
1.部署zookeeper
1
| # kubectl apply -f zk.yml
|
查看状态:
1
2
3
4
5
6
| [root@k3s-master kafka]# kubectl get po
NAME READY STATUS RESTARTS AGE
zookeeper-deployment-5bfb7d5b57-nc964 1/1 Running 0 3h53m
[root@k3s-master kafka]# kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
zookeeper-service NodePort 10.43.238.86 <none> 2181:32181/TCP 3h54m
|
2.部署kafka
编写部署kafka的YAML文件.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: 192.168.1.192:5000/kafka:latest
ports:
- containerPort: 9092
command:
- sh
- -c
- "exec /opt/bitnami/kafka/bin/kafka-server-start.sh /opt/bitnami/kafka/config/server.properties.original --override broker.id=0 \
--override listeners=PLAINTEXT://:9092 \
--override advertised.listeners=PLAINTEXT://192.168.1.189:30092 \
--override zookeeper.connect=192.168.1.189:32181/kafka \
--override auto.create.topics.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false"
env:
- name: MY_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: KAFKA_HEAP_OPTS
value : "-Xms1g -Xmx1g"
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service
spec:
type: NodePort
ports:
- port: 30092
targetPort: 9092
nodePort: 30092
selector:
app: kafka
|
这次选用的kafka镜像是:bitnami/kafka:latest和官方镜像有部分不同,所以在启动参数上有些更改。
简单说明下部署过程:
首先使用使用deploy部署pod,在保证不改变容器内原配置下,使用命令参数改变kafka启动方式。和zk一样,创建service并使用nodeport方式将容器应用端口暴露出来。
30092(容器外):9092(容器内)。
应用配置,并查看应用部署状态:
1
2
3
4
5
6
7
8
| [root@k3s-master kafka]# kubectl get po
NAME READY STATUS RESTARTS AGE
zookeeper-deployment-5bfb7d5b57-nc964 1/1 Running 0 4h4m
kafka-deployment-5458486459-cqmcl 1/1 Running 1 (3h46m ago) 3h47m
[root@k3s-master kafka]# kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
zookeeper-service NodePort 10.43.238.86 <none> 2181:32181/TCP 4h4m
kafka-service NodePort 10.43.217.79 <none> 30092:30092/TCP 3h47m
|
二、测试
创建kafka主题。进入到kafka的pod中,使用自带的命令行客户端工具。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| [root@k3s-master kafka]# kubectl exec -it po kafka-deployment-5458486459-cqmcl bash
kubectl exec [POD] [COMMAND] is DEPRECATED and will be removed in a future version. Use kubectl exec [POD] -- [COMMAND] instead.
Error from server (NotFound): pods "po" not found
[root@k3s-master kafka]# kubectl exec -it kafka-deployment-5458486459-cqmcl -- bash
I have no name!@kafka-deployment-5458486459-cqmcl:/$ cd /opt/bitnami/kafka/bin/
I have no name!@kafka-deployment-5458486459-cqmcl:/opt/bitnami/kafka/bin$ ls
connect-distributed.sh kafka-console-consumer.sh kafka-get-offsets.sh kafka-replica-verification.sh kafka-verifiable-producer.sh
connect-mirror-maker.sh kafka-console-producer.sh kafka-jmx.sh kafka-run-class.sh trogdor.sh
connect-plugin-path.sh kafka-consumer-groups.sh kafka-leader-election.sh kafka-server-start.sh windows
connect-standalone.sh kafka-consumer-perf-test.sh kafka-log-dirs.sh kafka-server-stop.sh zookeeper-security-migration.sh
kafka-acls.sh kafka-delegation-tokens.sh kafka-metadata-quorum.sh kafka-storage.sh zookeeper-server-start.sh
kafka-broker-api-versions.sh kafka-delete-records.sh kafka-metadata-shell.sh kafka-streams-application-reset.sh zookeeper-server-stop.sh
kafka-client-metrics.sh kafka-dump-log.sh kafka-mirror-maker.sh kafka-topics.sh zookeeper-shell.sh
kafka-cluster.sh kafka-e2e-latency.sh kafka-producer-perf-test.sh kafka-transactions.sh
kafka-configs.sh kafka-features.sh kafka-reassign-partitions.sh kafka-verifiable-consumer.sh
|
1.创建第一个Topic
1
| $ ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test001 --partitions 100 --replication-factor 1
|
查看详情
1
2
| $ ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
test001
|
kafka-topics.sh 是 Apache Kafka 提供的一个命令行工具,用于管理 Kafka 集群中的主题(topics)。我们可以使用它来创建、列出、描述、修改或删除主题。
列出所有主题:
1
| bin/kafka-topics.sh --list --bootstrap-server <kafka_connect_string>
|
创建主题
1
| bin/kafka-topics.sh --create --bootstrap-server <kafka_connect_string> --replication-factor <replication_factor> --partitions <num_partitions> --topic <topic_name>
|
描述主题
1
| bin/kafka-topics.sh --describe --bootstrap-server <kafka_connect_string> --topic <topic_name>
|
删除主题
默认情况下,Kafka 不会自动删除主题。但是,如果Kafka 集群启用了 delete.topic.enable 配置(在 server.properties 中),可以使用以下命令来删除主题:
1
| bin/kafka-topics.sh --delete --bootstrap-server <kafka_connect_string> --topic <topic_name>
|
但是,请注意,即使删除了主题,Kafka 也会保留与该主题相关的日志文件,除非也配置了日志删除策略。
三、部署kafka-ui
1
| docker run -itd -p 18080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui
|
访问方式
填写集群信息后,可以查询brokers、topics和consumers。