跳到内容

流数据的分析

关于 Bytewax

Bytewax 是一个专为 Python 开发者设计的开源流处理框架。
它允许用户构建具有与 Flink、Spark 和 Kafka Streams 类似功能的流数据流水线和实时应用,同时提供友好且熟悉的界面,并与 Python 生态系统 100% 兼容。

使用 Bytewax 和 ydata-profiling 进行流处理

数据分析是成功开始任何机器学习任务的关键,它指的是透彻理解我们的数据的步骤:包括其结构、行为和质量。
简而言之,数据分析涉及分析与数据格式和基本描述符相关的方面(例如,样本数量、
特征数量/类型、重复值)、其内在
特性(例如,缺失数据的存在或特征不平衡),以及数据收集或处理过程中可能出现的其他复杂因素(例如,错误值或不一致的
特征)。

包版本

ydata-profiling 与 bytewax 的集成适用于
任何版本 >= 3.0.0

模拟流

下面的代码用于模拟数据流。当流数据源可用时,这不是必需的。

导入
1
2
3
4
5
6
from datetime import datetime, timedelta, timezone

from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import CSVInput
from bytewax.testing import run_main

然后,我们定义数据流对象。之后,我们将使用一个无状态的 map 方法,在该方法中传入一个函数,用于将字符串转换为 datetime 对象,并将数据重构为 (device_id, data) 格式。map 方法将以无状态的方式对每个数据点进行更改。我们修改数据形状的原因是为了在后续步骤中轻松地按设备对数据进行分组,以便单独对每个设备的数据进行分析,而不是同时对所有设备进行分析。

设置数据流
flow = Dataflow()
flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000"))

# parse timestamp
def parse_time(reading_data):
    reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc)
    return reading_data

flow.map(parse_time)

# remap format to tuple (device_id, reading_data)
flow.map(lambda reading_data: (reading_data["device"], reading_data))

现在我们将利用 bytewax 的有状态功能,在定义的时间段内收集每个设备的数据。ydata-profiling 期望数据随时间变化的快照,这使得 window 操作符成为实现这一目标的完美方法。

在 ydata-profiling 中,我们可以为特定上下文的数据帧生成汇总统计信息。例如,在此示例中,我们可以生成针对每个 IoT 设备或特定时间帧的数据快照。

分析流快照

分析不同的数据快照
from bytewax.window import EventClockConfig, TumblingWindow

# This is the accumulator function, and outputs a list of readings
def acc_values(acc, reading):
    acc.append(reading)
    return acc

# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
def get_time(reading):
    return reading["ts"]


# Configure the `fold_window` operator to use the event time.
cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30))

# And a tumbling window
align_to = datetime(2020, 1, 1, tzinfo=timezone.utc)
wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1))

flow.fold_window("running_average", cc, wc, list, acc_values)

flow.inspect(print)

定义快照后,利用 ydata-profiling 就像为我们想要分析的每个数据帧调用 ProfileReport 一样简单。

import pandas as pd
from ydata_profiling import ProfileReport


def profile(device_id__readings):
    print(device_id__readings)
    device_id, readings = device_id__readings
    start_time = (
        readings[0]["ts"]
        .replace(minute=0, second=0, microsecond=0)
        .strftime("%Y-%m-%d %H:%M:%S")
    )
    df = pd.DataFrame(readings)
    profile = ProfileReport(
        df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}"
    )

    profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html")
    return f"device {device_id} profiled at hour {start_time}"


flow.map(profile)

在此示例中,我们将图像作为 map 方法中函数的一部分写入本地文件。这些可以通过消息传递工具报告出来,或者我们可以在未来将它们保存到某些远程存储中。配置文件完成后,数据流需要一些输出,因此我们可以使用内置的 [StdOutput]{.title-ref} 来打印被分析的设备以及在 map 步骤中从 profile 函数传递出来的分析时间。

flow.output("out", StdOutput())

有多种执行 Bytewax 数据流的方法。在此示例中,我们使用相同的本地机器,但 Bytewax 也可以在多个 Python 进程中、跨多个主机上、在 Docker 容器中、使用 Kubernetes 集群以及更多方式运行。在此示例中,我们将继续使用本地设置,但我们鼓励您在流水线准备好过渡到生产环境时查看管理 Kubernetes 数据流部署的 waxctl

假设我们在与数据流定义文件相同的目录中,我们可以使用以下命令运行它

python -m bytewax.run ydata-profiling-streaming:flow

然后,我们可以使用分析报告来验证数据质量,检查模式或数据格式的更改,并比较不同设备或时间窗口之间的数据特性。

我们可以进一步利用比较报告功能,它以直接的方式突出显示两个数据概要之间的差异,使我们更容易检测需要调查的重要模式或需要解决的问题。

比较不同的流
1
2
3
4
5
6
7
#Generate the profile for each stream
snapshot_a_report = ProfileReport(df_a, title="Snapshot A")
snapshot_b_report = ProfileReport(df_b, title="Snapshot B")

#Compare the generated profiles
comparison_report = snapshot_a_report.compare(snapshot_b_report)
comparison_report.to_file("comparison_report.html")

现在您已准备好开始探索您的数据流了!Bytewax 负责处理将数据流处理和构建为快照所需的所有过程,然后可以通过 ydata-profiling 生成数据特性的综合报告来对这些快照进行汇总和比较。