做云市场行业的领导者
欢迎光临本网站
主页 > 阿里云 >

动手:使用KSQ全民淘客-L构建流式应用程序

发布时间:2020-09-22 15:45   来源:维塔尔云    作者:维塔尔云

动手:使用KSQL构建流式应用程序

在这篇博客文章中,我们将向您展示如何使用KSQL构建一个演示流式应用程序KSQL是Apache Kafka®的流式SQL引擎。此应用程序根据歌曲播放事件流实时计算顶级音乐图表。这个演示听起来熟悉吗?应该的。去年,Confluent使用Kafka Streams API开发了一个类似的演示应用程序。从那时起,Confluent发布了可供生产使用的KSQL。用户可以根据用例在Kafka流或KSQL之间进行选择。这两种技术都使开发人员能够构建实时、可伸缩、容错的应用程序,只需一次就可以完成,用于融合云和本地Kafka部署。这里我们采用相同的音乐应用程序概念,并使用KSQL重写它,以便您可以比较和对比体验。 本教程附带:用于音乐图表屏幕播放的KSQL流媒体应用程序自动KSQL演示安装程序注意:本教程假设您正在Docker中运行KSQL音乐演示应用程序。您也可以通过运行start脚本在一个融合的平台本地安装上运行这个演示。复制此示例repo。$git克隆https://github.com/confluentinc/examples在Docker的高级设置中,将专用于Docker的内存增加到至少8GB(默认为2GB)。在examples/music目录中,通过运行一个命令来启动demo,该命令将打开Docker容器。完成此操作不到两分钟。$cd示例/音乐$docker组成-d在Confluent Control Center的日志中看到以下输出之前,物联网教室,请不要继续。$docker compose logs-f control center | grep-i"已启动NetworkTrafficServerConnector"控制中心|[2018-09-06 15:03:22518]信息Trafficed4885网络连接器{HTTP/1.1,[HTTP/1.1]}{0.0.0.0:9021}(org.eclipse.jetty.服务器.AbstractConnector)检查源Kafka数据Docker Compose文件包含一个容器,该容器自动为Apache Avro中的两个主题生成源数据™ 格式:播放事件:播放事件流("播放了歌曲X")歌曲源:歌曲元数据流("歌曲X由艺术家Y编写")启动演示后,使用googlechrome导航到位于的控制中心:9021。在"管理"下选择"主题",然后向下滚动并单击该主题并选择"检查"。检查主题游戏事件:或者使用Control Center中的KSQL查询编辑器检查打印主题"play events";:通过在Control Center中使用KSQL查询编辑器中的FROM beging子句打印主题歌曲提要:从一开始就打印"歌曲提要";注意:此时,您不能使用Control Center中的"主题检查"功能检查主题歌曲提要中的数据,因为主题检查仅适用于新数据,企业交流软件,而不适用于已经生成到主题的先前数据,这里就是这种情况。回顾一下,源数据播放事件和歌曲提要如下所示:创建新流让我们从Kafka主题创建一个新的流,从playevents主题开始。将主题播放事件注册为KSQL流,并指定它是Avro格式。将新的流名称配置为ksql\u playevents将消息值编码更改为AVRO(默认值:JSON)注意:在KSQL流和表的名称前面加上KSQL_u。这不是必需的,但是这样做可以让您在这个音乐演示的Kafka Streams API版本的同时运行这些KSQL查询,并避免命名冲突。从KSQL查询编辑器注册主题播放事件:使用(KAFKA_TOPIC='play-events',云服务器服务,VALUE_FORMAT='AVRO'创建流ksql_playevents)或者,从KSQL Streams视图中,选择"createstream"并填写如下所示的字段。由于Control Center与Confluent Schema Registry集成,它可以自动检测字段song_id和duration以及它们各自的数据类型。过滤数据对新创建的流ksql\u playevents进行一些基本过滤,例如对播放了至少30秒的歌曲进行筛选。这是您从KSQL查询编辑器获得的结果:从持续时间>30000的ksql_playevents中选择*;上述查询不是持久的,如果关闭此屏幕,它将停止。要使查询持久化并在显式终止之前保持运行,请在前一个查询前面添加createstreamAS。您将在KSQL查询编辑器中看到:创建流ksql_playevents_min_duration作为SELECT*FROM ksql_playevents,其中duration>30000;现在这个持久查询将显示在查询列表中。创建新表接下来,让我们研究歌曲提要主题,它有效地表示了一个歌曲表。KSQL表中的数据必须具有String类型的键。在这个演示中,表可以键入歌曲的ID,以便在该表上进行联接和聚合。然而,最初的Kafka主题有一个Long类型的键,ID字段的类型是BIGINT。您仍然可以通过几个简单的步骤创建一个表。1从原始Kafka主题曲源创建流:使用(KAFKA_TOPIC='song-feed',云品,VALUE_FORMAT='AVRO')创建流ksql_songfeed;然后描述流以查看与此主题关联的字段,并注意字段ID的类型为BIGINT:描述ksql_songfeed;2使用PARTITION BY子句指定键,并使用CAST函数将字段类型更改为String:以(KAFKA_TOPIC='ksql_songfeedwithkey',VALUE_FORMAT='AVRO')作为选择演员(ID AS STRING)作为ID,专辑,艺术家,名称,流派的ksql_songfeed按ID创建流;三。将上面的流转换成一个以ID字段为键的表,现在它的类型是String。此表是事件的具体化视图,每个键只有最新值,表示最新的歌曲表创建表ksql_songtable WITH(KAFKA_TOPIC='ksql_SONGFEEDWITHKEY',VALUE_FORMAT='Avro',KEY='ID');请确认此KSQL表中的条目具有与歌曲的字符串ID匹配的ROWKEY:从ksql_songtable limit 5中选择*;使用歌曲表加入播放活动现在,我们已经创建了一个名为ksql_playevents_min_duration的过滤播放事件流,以及一个名为ksql_songtable的歌曲元数据表。使用流表联接使用歌曲元数据丰富播放事件流。这将导致一个新的播放事件流丰富的描述性歌曲信息,如歌曲标题。创建流ksql_歌曲播放作为选择plays.SONG_ID游戏AS ID,ALBUM,ARTIST,NAME,GENRE,DURATION,1 AS KEYCOL FROM ksql_playevents_min_DURATION plays LEFT JOIN ksql_songtable ONplays.SONG_ID游戏= 歌曲表.ID;注意添加了一个子句1作为KEYCOL。对于每一行,这将创建一个值为1的新字段KEYCOL,稍后可以在其他派生流和表中使用该字段进行全局聚合。创建顶级音乐排行榜现在,您可以创建一个最高的音乐图表,以查看哪些歌曲播放得最多。在上面创建的ksql_songplays流上使用COUNT函数。创建表ksql\u songplaycounts作为选择ID,NAME,什么是软件企业,GENRE,KEYCOL,COUNT(*)作为ksql廑songplays GROUP BY ID,NAME,GENRE,KEYCOL;虽然有史以来最棒的点击率都很酷,但如果能看到最后30秒的统计数据,那也是件好事。创建另一个查询,添加一个WINDOW子句,该子句以30秒的间隔给出所有歌曲的播放事件计数。创建表ksqlüu songplaycounts30作为选择ID,NAME,GENRE,KEYCOL,COUNT(*)作为ksql悱songplays WINDOW TUMBLING(大小30秒)按ID,NAME,GENRE,KEYCOL分组;这是你建造的恭喜您,您构建了一个实时处理数据的流式应用程序!该应用程序使用歌曲元数据丰富了播放事件流,并生成了最高计数。任何下游系统都可以使用KSQL查询的结果进行进一步处理。如果您已经熟悉SQL语义,希望本教程不难理解。从ksql_songplaycounts30 LIMIT 5中选择*;进一步了解KSQL的功能下载Confluent Platform并按照快速入门开始使用KSQL运行本教程中描述的KSQL音乐应用程序演示,并探索展示KSQL的其他自动演示观看升级你的KSQL视频系列在我们的社区Slack小组的ksql频道中提问

上一篇:溪流与桌子:智慧农业物联网系统-一枚硬币的两面
下一篇:没有了

分享到:
0
最新资讯
阅读排行