I am trying to follow the tutorial Deploying Debezium using the new KafkaConnector resource.
Based on the tutorial, I am also using minikube but with docker driver. Basically just follow exactly step by step.
However, for the step “Create the connector”, after creating the connector by
cat <<EOF | kubectl -n kafka apply -f - apiVersion: "kafka.strimzi.io/v1alpha1" kind: "KafkaConnector" metadata: name: "inventory-connector" labels: strimzi.io/cluster: my-connect-cluster spec: class: io.debezium.connector.mysql.MySqlConnector tasksMax: 1 config: database.hostname: 192.168.99.1 database.port: "3306" database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}" database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}" database.server.id: "184054" database.server.name: "dbserver1" database.whitelist: "inventory" database.history.kafka.bootstrap.servers: "my-cluster-kafka-bootstrap:9092" database.history.kafka.topic: "schema-changes.inventory" include.schema.changes: "true" EOF
and check by
kubectl -n kafka get kctr inventory-connector -o yaml
I got error
apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: annotations: kubectl.kubernetes.io/last-applied-configuration: | {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}} creationTimestamp: "2021-09-29T18:20:11Z" generation: 1 labels: strimzi.io/cluster: my-connect-cluster name: inventory-connector namespace: kafka resourceVersion: "12777" uid: 083df9a3-83ce-4170-a9bc-9573dafdb286 spec: class: io.debezium.connector.mysql.MySqlConnector config: database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 database.history.kafka.topic: schema-changes.inventory database.hostname: 192.168.49.2 database.password: "" database.port: "3306" database.server.id: "184054" database.server.name: dbserver1 database.user: "" database.whitelist: inventory include.schema.changes: "true" tasksMax: 1 status: conditions: - lastTransitionTime: "2021-09-29T18:20:11.548Z" message: |- PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s): A value is required You can also find the above list of errors at the endpoint `/{connectorType}/config/validate` reason: ConnectRestException status: "True" type: NotReady observedGeneration: 1
I tried to change
database.user: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_username}" database.password: "${file:/opt/kafka/external-configuration/connector-config/debezium-mysql-credentials.properties:mysql_password}"
to
database.user: "debezium" database.password: "dbz"
directly and re-apply, based on the user and password info in “Secure the database credentials” step.
Also, based on the description in the tutorial
I’m using
database.hostname: 192.168.99.1
as IP address for connecting to MySQL because I’m using minikube with the virtualbox VM driver If you’re using a different VM driver withminikube
you might need a different IP address.
I am actually a little confused for above description. MySQL in the demo is deployed in Docker, while the rest of parts like Kafka are deployed in minikube. Why the description about database.hostname
says minikube instead of Docker?
Anyway, when I run minikube ip
, I got 192.168.49.2
. However, after I change database.hostname
to 192.168.49.2
, and run kubectl get kctr inventory-connector -o yaml -n kafka
, I got
apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: annotations: kubectl.kubernetes.io/last-applied-configuration: | {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"192.168.49.2","database.password":"","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}} creationTimestamp: "2021-09-29T18:20:11Z" generation: 1 labels: strimzi.io/cluster: my-connect-cluster name: inventory-connector namespace: kafka resourceVersion: "12777" uid: 083df9a3-83ce-4170-a9bc-9573dafdb286 spec: class: io.debezium.connector.mysql.MySqlConnector config: database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 database.history.kafka.topic: schema-changes.inventory database.hostname: 192.168.49.2 database.password: "" database.port: "3306" database.server.id: "184054" database.server.name: dbserver1 database.user: "" database.whitelist: inventory include.schema.changes: "true" tasksMax: 1 status: conditions: - lastTransitionTime: "2021-09-29T18:20:11.548Z" message: |- PUT /connectors/inventory-connector/config returned 400 (Bad Request): Connector configuration is invalid and contains the following 1 error(s): A value is required You can also find the above list of errors at the endpoint `/{connectorType}/config/validate` reason: ConnectRestException status: "True" type: NotReady observedGeneration: 1
I can access MySQL by localhost
as it is hosted in Docker.
However, I still same error when I changed database.hostname
to localhost
.
Any idea? Thanks!
Answers:
Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.
Method 1
The issue is related with the service in minikube failed to communicate with the MySQL in the docker.
Regarding how to access host’s localhost
from inside Kubernetes cluster, I found How to access host’s localhost from inside kubernetes cluster
However, I end up with deploying MySQL in Kubernetes direction by
kubectl apply -f https://k8s.io/examples/application/mysql/mysql-pv.yaml kubectl apply -f https://k8s.io/examples/application/mysql/mysql-deployment.yaml
(Copied from https://kubernetes.io/docs/tasks/run-application/run-single-instance-stateful-application/)
with
database.hostname: "mysql.default" # service `mysql` in namespace `default` database.port: "3306" database.user: "root" database.password: "password"
Now when I run
kubectl -n kafka get kctr inventory-connector -o yaml
I got a new error saying MySQL not enabling row-level binlog, however, it means it can connect the MySQL now.
apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: annotations: kubectl.kubernetes.io/last-applied-configuration: | {"apiVersion":"kafka.strimzi.io/v1alpha1","kind":"KafkaConnector","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"my-connect-cluster"},"name":"inventory-connector","namespace":"kafka"},"spec":{"class":"io.debezium.connector.mysql.MySqlConnector","config":{"database.history.kafka.bootstrap.servers":"my-cluster-kafka-bootstrap:9092","database.history.kafka.topic":"schema-changes.inventory","database.hostname":"mysql.default","database.password":"password","database.port":"3306","database.server.id":"184054","database.server.name":"dbserver1","database.user":"root","database.whitelist":"inventory","include.schema.changes":"true"},"tasksMax":1}} creationTimestamp: "2021-09-29T19:36:52Z" generation: 1 labels: strimzi.io/cluster: my-connect-cluster name: inventory-connector namespace: kafka resourceVersion: "2918" uid: 48bb46e1-42bb-4574-a3dc-221ae7d6a803 spec: class: io.debezium.connector.mysql.MySqlConnector config: database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092 database.history.kafka.topic: schema-changes.inventory database.hostname: mysql.default database.password: password database.port: "3306" database.server.id: "184054" database.server.name: dbserver1 database.user: root database.whitelist: inventory include.schema.changes: "true" tasksMax: 1 status: conditions: - lastTransitionTime: "2021-09-29T19:36:53.605Z" status: "True" type: Ready connectorStatus: connector: state: UNASSIGNED worker_id: 172.17.0.8:8083 name: inventory-connector tasks: - id: 0 state: FAILED trace: "org.apache.kafka.connect.errors.ConnectException: The MySQL server is not configured to use a row-level binlog, which is required for this connector to work properly. Change the MySQL configuration to use a row-level binlog and restart the connector.ntat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:207)ntat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)ntat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)ntat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)ntat java.util.concurrent.FutureTask.run(FutureTask.java:266)ntat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)ntat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)ntat java.lang.Thread.run(Thread.java:748)n" worker_id: 172.17.0.8:8083 type: source observedGeneration: 1
All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0