做云市场行业的领导者
欢迎光临本网站
主页 > 阿里云 >

与Apache Kafka和大数据提供-KSQL的数据争论

发布时间:2020-09-22 15:44   来源:维塔尔云    作者:维塔尔云

与Apache Kafka和KSQL的数据争论

KSQLApache Kafka®的SQL流引擎,它将流处理的能力交给了任何了解SQL的人。用它来探索Kafka主题中的数据是很有趣的,但它的真正威力在于构建流处理应用程序。通过不断地将消息从一个Kafka主题流到另一个主题,应用SQL表示的转换,可以构建功能强大的应用程序来执行常见的数据争用任务,例如:将架构应用于数据过滤和屏蔽数据更改数据结构(例如,展平嵌套数据)更改序列化格式丰富数据流统一多个数据流数据争用意味着将原始数据更改为更易于使用的形式。数据科学的这一方面本质上涉及到收集到的数据,对其进行清理并将其转换为不同的格式,淘客公众号,这样可以增加数据的价值。仅仅拥有你所拥有的数据是不够的,你需要利用它,适当地利用这些数据并将其转换成你可以分析的东西。因此,你将能够从中获得更大的价值。假设你有数据,你想建立一些分析。它来自多个来源可能是同一系统的不同实例。您需要将这些源统一到一个单一的源中,沿途重新格式化数据,云服务市场,并对数据应用一个模式,以便可以轻松地对其进行下游分析。apachekafka和KSQL是一个很好的选择。为什么是卡夫卡?因为我们可以使用它来集成源系统和下游系统,以接近实时的方式,并且能够在不干扰其他组件的情况下添加和交换源和目标。此外,我们可以根据需要重新处理数据并与其他系统共享,使它们能够利用我们所执行的数据清理和数据争用。 在本文中,我们将了解如何从REST源获取数据,对其进行清理并与KSQL进行数据争用,然后将其流式传输到Google云存储(GCS)和googlebigquery,以便在googledatastudio中进行分析和可视化。我们用的是汇合云™ 托管我们的Kafka代理,但如果您愿意,它将在本地集群上工作。您可以在GitHub上找到本文中使用的所有代码,包括Docker Compose文件。获取数据信贷:本节使用环境署的实时数据API(beta)提供的降雨数据。 我在这里工作的数据来自英国政府环境署提供的公共API。它包含了从全国各地不同气象站获取的有关降雨量、河流水位等信息的读数。每个站点都有自己的restapi端点。我们将从其中的四个数据中提取数据,使用Kafka Connect对数据源进行轮询。每个端点的数据结构略有不同,这给了我们一个很好的数据清理问题的实例。我使用的是Kafka Connect REST源连接器,它是由Lenny Löfberg编写的社区贡献的连接器。使用这个连接器,我可以指定一个REST端点和一个轮询间隔,得到的有效负载被写入Kafka主题。首先,创建连接器(每个站点一个):{"name":"source_rest_flood-monitoring-L2404","配置":{"连接器.class": "卡夫卡网站.connect.rest.RestSourceConnector","rest.source.url": "http://environment.data.gov.uk/flood-monitoring/id/stations/L2404",[...]}}Giton集线器完整代码在这个过程中,每个气象站都有一个卡夫卡主题,每个气象站都有数据:$ccloud消费——从一开始——主题洪水监测-059793 | jq"{"@context":","元":{"publisher":"环境署",[...]]},"项目":{[...]"label":"雨量站","纬度":53.966652,"长":-1.115089,"措施":{[...]"最新消息":{"dateTime":"2018-08-22T05:30:00Z","值":0[...]您将从上面的JSON示例中注意到它是嵌套数据。对于本站(059793),云服务器设备,只有一个度量。但是,对于另一个站(3680),有多个测量读数,并且测量值以阵列形式提供:[...]"项目":{"@id":","label":"雨量站",[..."措施":[{[...]"label":"降雨-倾翻式""斗式""雨量器-t-15_min-mm","最新消息":{"dateTime":"2018-08-30T04:00:00Z","值":0[...]},},{[...]"label":"temperature-dry_-bulb-i-15_min-deg_摄氏度","最新消息":{"dateTime":"2018-08-30T04:00:00Z","值":8[...]},}],[...]这种变化的数据结构是数据集成和ETL中常见的问题。我们将在本文中看到KSQL如何帮助解决这种挑战。现在,我们希望采用这四个Kafka主题并构建一个流处理应用程序,该应用程序将填充来自这四个主题的转换数据的单个统一输出主题。这将处理Kafka主题中已经存在的数据,以及到达的每个新消息。声明架构传入的数据是JSON,但没有声明的模式。作为一个框架,Kafka Connect可以自动注册入站数据的模式,并将其序列化为apacheavro™,但这里的REST连接器基本上只是从REST端点提取字符串数据,而字符串数据恰好是JSON。所以,我们要用KSQL做的第一件事就是为每个主题的源数据声明一个模式。请注意,模式略有不同,以考虑来自包含阵列的其中一个站的数据。创建溪流洪水监测_059793\(元结构\项目结构\)WITH(KAFKA_TOPIC='flood-monitoring-059793',VALUE_FORMAT='JSON');[...]创建溪流洪水监测_3680\(元结构\项目结构\)WITH(KAFKA_TOPIC='flood-monitoring-3680',VALUE_FORMAT='JSON');注册Kafka主题并定义模式后,我们可以列出:ksql>显示流;流名称| Kafka主题|格式----------------------------------------------------------------洪水监测|洪水监测-3680 | JSON洪水监测| L2404 |洪水监测-L2404 | JSON洪水监测|洪水监测|洪水监测-059793 | JSON洪水监测----------------------------------------------------------------在我们对数据做任何其他操作之前,淘客平台,我们可以使用KSQL的能力重新序列化并将原始JSON数据转换为Avro。这里的优点是,任何下游应用程序(无论是另一个KSQL进程、Kafka Connect或Kafka使用者)都可以直接处理来自主题的数据,并从汇合的模式注册表中获取数据的模式。要执行此操作,请使用CREATE STREAM…AS SELECT语句,并将VALUE_格式指定为with子句的一部分:创建溪流洪水监测_3680_AVRO\使用(VALUE_FORMAT='AVRO')作为\从洪水监测_3680中选择*;如果您想更改分区和复制因子,您也可以在这个阶段定义它。在本练习的其余部分,我们将继续使用原始的JSON主题,并在稍后应用Avro序列化。使用嵌套数据因为源主题中的数据是嵌套的JSON,如何学习大数据,所以我们将父列的数据类型声明为STRUCT。要使用KSQL访问数据,请使用->运算符:选择项目->站点参考\项目->earegionname\项目->度量->参数名称\项目->度量->最晚阅读->日期时间\项目->度量->最新读数->值\项目->度量->单位名称\来自洪水监测区1;L2481 |东北|水位| 2018-08-22T13:00:00Z | 5.447 | mAOD对于数组(声明为数组的一部分)的读数,请使用方括号指定索引:ksql>选择项目->站点参考\项目->earegionname\项->度量值[0]->parameterName\项->度量值[0]->最新读取->日期时间\项->度量值[0]->latestreading->value\项->度量值[0]->unitname\来自洪水监测_3680限制1;3680 |米德兰|降雨量| 2018-08-30T04:00:00Z | 0.0 |毫米统一来自多个流的数据从每个主题和阅读类型中提取一个样本记录,在手动整理时,我们可以看到以下表格:站点参考车站区域测量类型时间戳我

上一篇:把阿帕奇卡夫卡的有哪些云服务器-力量交给数据科学家
下一篇:溪流与桌子:智慧农业物联网系统-一枚硬币的两面

分享到:
0
最新资讯
阅读排行