Dapr 入门教程之消息队列

网站建设4年前发布
16 0 0

20230306012622542a19505c8108a4845826bb175508ab144e52209,前面我们了解了 Dapr 对发布订阅的支持,本节我们将来介绍了 Dapr 中对消息队列的支持。消息队列,分为两种绑定,一种是输出绑定,一种是输入绑定。出和入是看数据的流向,输出绑定就是作为生产者的服务把消息通过 Dapr 传给消息队列,输入绑定就是作为消费者的服务通过 Dapr 从消息队列里得到消息。,这里的消息队列和发布订阅里的消息总线有什么区别呢?一个消息进入消息总线的话,所有订阅者都能得到这个消息,而一个消息进入消息队列的话,由消费者来取,一次只有一个人能得到。此外,消息总线是不要求处理顺序的,两个消息进入消息总线,谁先被拿到顺序是不一定的,而消息队列可以保证是先入先出的。,本节我们将创建两个微服务,一个具有输入绑定,另一个具有输出绑定,前面我们都使用的 Redis 这种中间件,这里我们将绑定到 Kafka。,绑定连接到 Kafka,允许我们将消息推送到 Kafka 实例(从 Python 微服务)中,并从该实例(从 Node.js 微服务)接收消息,而不必知道实例的位置。相反,同样只需要直接使用 Dapr API 通过 sidecars 连接即可。,首先我们在本地来运行示例应用,对应的架构图如下所示:,2023030601262211c3318120c9f148d8e7486c3d6eae0d3ad78d657Bindings 本地模式,同样使用 quickstarts 这个代码仓库:,由于我们这里是使用 Kafka 来做消息队列的中间件,所以我们首先需要在本地环境运行 Kafka,我们可以直接使用 https://github.com/wurstmeister/kafka-docker 这个项目以 Docker 方式运行。,定位到 quickstarts 的 tutorials/bindings 目录,下面有一个 docker-compose-single-kafka.yml 文件:,我们可以直接而使用 docker-compose 来启动一个单实例的 Kafka:,隔一段时间镜像拉取完成后以容器方式启动 Kafka:,在本地运行了 Kafka 后,接着我们可以运行输入绑定的 Node.js 微服务:,同样先安装服务依赖:,然后我们就可以使用 dapr run 命令来启动该微服务了,启动方式我们应该比较熟悉了,如下所示:,上面的命令和前面有点不一样的地方是多了一个 --components-path 用来指定组件路径,这是因为现在我们要使用 Kafka 这种中间件来作为我们的消息队列组件,那么我们就需要告诉 Dapr,在 ./components 目录下面就包含一个对应的 kafka_bindings.yaml 文件,内容如下所示:,前面在本地模式下面我们没有主动声明组件,是因为我们使用的就是默认的 Redis,而 Kafka 并不是内置就有的,所以需要我们主动声明,注意上面组件的类型为 type: bindings.kafka,metadata 下面是访问 Kafka 相关的元数据。正常情况下上面的启动命令会输出如下所示的日志信息:,接下来,需要运行输出绑定的 Python 微服务,定位到 pythonapp​ 目录,安装 requests 依赖:,然后同样用 dapr run 命令来启动该微服务,也要注意带上后面的 --components-path 参数:,启动完成后,观察 Python 服务的日志,可以看到不断输出如下所示成功输出绑定到 Kafka 的日志:,同样这个时候 Node.js 微服务中也不断有新的日志数据产生:,这是因为 Python 微服务每隔 1s 就会向我们绑定的消息队列发送一条消息,而 Node.js 微服务作为消费者当然会接收到对应的消息数据。,上面在本地环境下可以正常运行 Dapr bindings 服务,接下来我们再次将该示例部署到 Kubernetes 集群中来进行观察。,同样首先需要提供一个可用的 Kafka 实例,这里我们仍然使用 Helm Chart 方式来进行安装:,然后使用如下所示的命令来安装 Kafka:,这里我们指定了一个无需持久化数据(仅供测试)的 values 文件 kafka-non-persistence.yaml,内容如下所示:,安装完成后可以查看 Pod 的状态来保证 Kafka 启动成功:,接下来我们首先需要在 Kubernetes 集群中配置使用 Kafka 作为 Binding 消息中间件的 Component 组件:,注意该对象上面指定的组件类型为 bindings.kafka,metadata 下面的元信息包括 Kafka brokers地址、生产者和消费者的配置等等,直接应用上面的资源清单即可:,创建完成后在 Dapr Dashboard 中也可以看到对应的组件信息:,dapr dashboard components。,接着部署两个 Node.js 和 Python 微服务即可:,部署完成后可以同样分别观察 Node.js 和 Python 微服务的日志:,可以看到两个微服务的日志也服务我们的预期的。,前面我们在本地或 Kubernetes 中都运行了示例应用,而且没有更改任何代码,应用结果都符合预期,接下来我们看看这是如何工作的。,在查看应用程序代码之前,我们先看看 Kafka 绑定组件的资源清单文件,它们为 Kafka 连接指定 brokers,为消费者指定 topics 和 consumerGroup,为生产者指定了 publishTopic。,我们创建了名为 sample-topic 的组件,然后我们通过该组件配置的 Kafka 中的 sample 主题来设置输入和输出绑定。,现在我们先导航到 nodeapp​ 目录下面打开 app.js​ 文件,这是 Node.js 输入绑定示例应用的代码。这里使用 Express​ 暴露了一个 API 端点,需要注意的是 API 名称必须与在 Kafka 绑定组件中声明的组件名称相同,然后 Dapr 运行时将使用来自 sample 主题的事件,然后将 POST 请求与事件负载一起发送给 Node 应用程序。,所以当 Kafka 中收到消息后就会打印类似如下所示的日志:,然后我们导航到 pythonapp 目录下面打开 app.py 文件,这是输出绑定示例(生产者)应用程序的代码,该服务会每秒发送一次 POST 请求到 Dapr 的 http 端点的 http://localhost:3500/v1.0/bindings/<output_bindings_name>,并带有事件的 payload 数据。这个应用程序使用 bindings 组件名 sample-topic 作为 <output_bindings_name>,然后 Dapr 运行时将事件发送到上面的 Kafka 绑定组件中指定的 sample 主题上去。,上面代码中最重要的依然是 Dapr API 地址 dapr_url 的拼接 "http://localhost:{}/v1.0/bindings/sample-topic".format(dapr_port),注意我们依然是面向 localhost 编程,而 v1.0/bindings/<output_bindings_name> 端点则是 Dapr API 为我们封装的输出消息绑定的统一接口,非常简单方便。

© 版权声明

相关文章