背景
用户在手机和pc端使用客户业务产品,比如浏览网页,购买商品,查看文档,观看视频,直播,IOT场景;会产生大量的用户行为数据,主要包括:
- 非结构化数据:日志(前端事件埋点日志,服务端处理事件日志),还有些非结构化的图片,音视频数据等等,主要存放在文件存储系统中;
- 结构化和半结构化数据: 用户操作产品写入的结构化数据存放于数据库表中,将文档型半结构化的数据放入文档数据库中;
需要分析用户的行为数据,进行决策;分为实时流式处理和离线批处理:
- 实时流处理,主要用于实时展现客户端看板,后台BI实时分析,实时风控/推荐,异常报警等场景;
- 离线批处理,分析用户历史数据,进行推荐算法等机器学习算法模型训练使用,数据仓库中根据不同维度对数据过滤聚合,进行上卷下钻分析,比如计算DAU,WAU,MAU,转化率(购买率,注册率)分析等,通常对数据建设投入多的话, 会把用户产生的结构化非结构化的数据都存下,放在一个大的池子里待使用时进行分析,即所谓的数据湖,围湖而建挖掘数据价值;而数仓相对精细化的分析,前置建模建表分析;
对此进行方案分析,本文将介绍一种实时离线处理分析用户行为数据方案,即能帮助企业低成本地使用海量数据,又能更快速地响应业务需求,同时借助亚马逊云科技的托管服务,能够快速实施和轻松运维。
操作概括
- 权限设置:根据公司业务组织,分配对应服务资源权限,比如系统管理员OP, 开发人员DEV, 还有业务管理员OP;组织架构权限建设;
- 服务基础架构:首先需要搭好基础服务框架,结合云厂商服务,进行可水平垂直自动扩展,高容错性,低成本,可观测监控,易于维护,持续集成发布的稳定性架构建设;
- 业务迭代数据建设开发:架子搭好之后,需要对特定的业务场景进行数据建模,AI模型训练,挖掘出数据的价值,进行决策;服务代码质量,框架建设;
解决方案
使用aws 现有产品服务组件进行搭建,主要分为三个阶段,数据采集,数据处理存储,数据分析,整体架构如下:
数据源采集
- 访问日志和请求事件:用户在手机端通访CloudFront 内容分发服务, 会生成用户行为访问日志,这些实时日志中会有产品中定义的行为分析埋点记录事件,存放在S3中,通过Lambda无服务函数写入kinesis data streams中;如果需要实时处理,需要开启CloudFront实时日志功能,将数据写入Kinesis Data Streams中,会有几秒的处理延时;还有一种方式是服务端在线实时通过使用aws SDK方式直接写入记录事件数据,可通过无服务部署的lambda函数写入或者API gateway配置写入(参考:在 API Gateway 中创建 REST API 作为 Amazon Kinesis 代理),常用于实时异常报警和统计;
- 数据库数据: 存放在数据库中的数据,需要将数据同步存放在数仓和数据湖中,进行离线分析; 数据库的数据可以通过aws DMS服务来支持存量增量数据同步至Kinesis Data Streams中,具体方案可以参考**使用 AWS DMS 将更改数据流式传输到 Amazon Kinesis Data Streams**;也可以使用flink CDC connector on aws EMR支持同步(需要理解反压机制,以便触发时看是否增大下游消费能力提高吞吐), 可参考多库多表场景下使用Amazon EMR CDC实时入湖最佳实践,注意数据库表中的数据字段需要规范,需要一个更新时间字段方便数据顺序同步,比如mysql
update_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
;
您可以参阅 AWS 白皮书AWS Cloud Data Ingestion Patterns and Practices,了解有关数据采集模式的更多详细信息;
将数据采集写入消息队列中主要是方便多个消费方来处理流,以及服务之间的整体解耦,可作为数据流缓存层,数据流量突增时,增加分片数,提高吞吐,当然整体吞吐量也取决于下游数据的消费能力,引入消息队列,以pull方式进行消费(主动消费),不至于将下游服务打挂,而且方便下游异常消费重启时,继续在未消费点开始消费;这里提供两种方案,一种将数据写入Kinesis Data Streams中,一种将数据写入MSK(aws 托管的kafka集群服务,如果直接自己搭建维护,成本比较高,对接其他aws服务相对复杂些) 中;
- 数据写入Kinesis Data Streams中主要是方便多个消费方来处理流,以及服务之间的整体解耦,数据流缓存层,更重要的是方便使用aws Kinesis方案,减少运维成本,对接也丰富,主要是方便对接内部Kinesis相关服务组件;
- 数据写入MSK中,使用的开源解决方案对接;下游对接kinesis Data Analytics 服务需要通过flink 计算引擎加载对应kafka connector包,从kafka中获取数据源进行分析;如果下游对接其他服务,比如:Kinesis Data Firehose,将数据传输转化写入数据湖S3中存储,写入Redshift数仓中分析;使用SNS 进行报警等; 需要引入无服务框架lambda函数,通过使用kafka client SDK库来进行生产/消费处理;
具体使用结合公司实际场景而定,如果想想通过kafka对接更多的开源大数据服务框架, 可以选择MSK方案,额外需要开发维护lambda函数服务;使用Kinesis Data streams 很方便接入aws相关服务,减少额外的开发维护成本,不过如果对接其他开源大数据服务框架,同样需要引入lambda函数服务;本文采用将数据写入Kinesis Data streams 服务,提供给下游对接。
数据处理存储
数据处理分析,分为实时处理的流数据和离线处理批数据处理存放;
实时处理使用AWS Kinesisi Data Analytics(KDA)进行分析,支持三种分析方式:
- 使用老的方式SQL 应用程序进行分析处理,不能使用编程语言python/Scala来调用api来进行精细化操作(需要定义UDF包提供使用),以及即席查询;
- 使用基于 Apache Flink 的开源库在 Kinesis Data Analytics 中构建 Java ,Scala 和Python 应用程序,Apache Flink 是处理数据流的常用框架和引擎,Kinesis Data Analytics现使用flink支持最高版本是1.13;在开发应用时,需要打印日志,以便使用CloudWatch 日志监控应用程序的性能和错误状况;如果应用程序出现bug或者服务异常中断可以使用检查点(CheckPoints)和保存点(SavePoints 生成快照)在 Kinesis Data Analytics 应用程序中实现容错功能; 同时Kinesis Data Analytics 可弹性扩缩应用程序的并行度,以适应大多数场景下的源数据吞吐量和操作复杂性,Kinesis Data Analytics 监控应用程序的资源 (CPU) 使用情况,并相应地弹性地向上或向下扩展应用程序的并行度; 使用算子(operators)进行数据流拓扑计算;同时通过connector可直接引入jar包接入数据源或者sink到下游服务,可以在mvn库中找到,比如kinesis stream connector;
- 在基于 Apache Flink 的开源库构建应用程序的基础上,结合Glue定义数据库表存放数据catalog元数据,通过zeppelin增加了可视化即席查询, 直接可以在notebook上编写flink 流/批**SQL**(notice: flinkSQL和KDA SQL有所不同,特别是在window上有些区别), 以及编写调用flink API的Scala,Python程序, 具体见zeppelin flink解释器; 这种方式是相对于第二种方式,在方便运维管理的基础上更加容易上手,直接在notebook上就可以进行即席查询, 查询会话还可以保留或存放本地,还可以作为测试开发调试的平台,构建好处理程序,可直接转化成持久化的应用程序部署;当然这部分费用相比前面的分析方式相对多些,启动的studio notebook 费用,Kinesis 处理单元(KPU)将按小时收费;可参考实例教程,使用 Kinesis Data Analytics Studio 和 Python 以交互方式查询数据流
整体来说: Amazon Kinesis Data Analytics 可以轻松地实时分析流数据,并使用标准 SQL、Python 和 Scala 构建由 Apache Flink 提供支持的流处理应用程序。特别是notebook功能只需在AWS 管理控制台中单击几下,写下分析SQL,就可以启动无服务器笔记本来查询数据流并在几秒钟内获得结果。Kinesis Data Analytics 降低了构建和管理 Apache Flink 应用程序的复杂性。
离线处理的数据主要是通过AWS Kinesis Data Firehose 传输流写入下游服务存储,将数据写入湖仓系统中, 用于后续的数据分析,以及前期规划好的数据分析;选用firehose的原因是有原始备份机制存放于S3中,即使数据传输错误时,数据传输中不会丢失数据,同时内置lambd函数在传输之前将数据转化处理,同时也支持用Glue定义表来转换大数据相关的记录格式;
原始分析数据直接存放于S3中,11个9的保障可以非常可靠保证数据不丢失,通过Glacier持久冷热存放,降低成本,方便后面追查数据,以及通过Athena查询引擎结合Glue来定义表从S3中挖掘出更有价值的数据;数据存放下来之后,结合大数据相关平台,EMR进行海量数据处理(PB级别);同时结合Glue 进行ETL 数据处理,集成编排成DAG工作流,可视化管理这些ETL任务作业,也可以迁移调度平台比如Azkaban的工作流迁移至Glue ETL工作流,参考Amazon Glue ETL作业调度工具选型初探;
提前业务场景数据分析建模,使用Redshift来存放不同维度(DIM)的表,分层(ODS->DWD->DWM->DWS)建设数据仓库; 选用redshift性价比比较高,开箱即用,存放结构化,半结构化数据,数据列式存储,分片存放,计算和存储分离,很方便无服务化,在数秒内轻松运行和扩展分析,而无需调配和管理数据仓库; 和数据湖打通,可以使用 Redshift Spectrum 在 Amazon S3 文件中查询数据,而不必将数据加载到 Amazon Redshift 表中,提高关联查询;还可以对接机器学习ML,通过CREATE MODEL DDL语句下推到Amazon SageMaker,从S3中加载数据进行训练;
数据分析
主要是通过分析引擎从数据存储获取数据,根据维度展现看板,进行实时数据查看,离线分析,以及可视化即席分析:
- 实时结果数据查看:通过DynamoDB获取实时结果数据,主要是Key/Value数据,同时查询速度很快,利用DAX实现内存中加速;通过lambda对接API Gateway提供数据接口,方便业务场景实时展现,比如监控查看异常数据,访问计数等结果展现;这些结果数据可以直接通过SNS发送邮件或者短信进行通知;
- 通过OpenSearch提供实时搜索服务,比如日志搜索实时定位追查问题,通过KDA实时分析写入;在线检索商品,这些数据主要来源于数据库,异构成OpenSearch索引数据进行检索,可通过flink CDC on EMR来同步数据到OpenSearch中;
- S3中的数据通过Athena在Glue/Hive上建表元数据,使用SQL查询,几分钟内可以查询到结果;
- 存放在Redshift数据仓库中的数据,通过Amazon QuickSight接入进行BI分析,响应时间在秒级别,只需要在界面上选择图表组合成一个仪表盘展现即可,方便快速决策;支持AWS区域接入IP范围;
组织用户角色权限管理
以上这些谈到的服务需要进行安全访问,通过IAM定义策略角色分配权限进行服务授权,来进行安全访问,有两种情况:
-
服务资源之间的访问,需要分配读写权限,需要把这些策略赋予莫个角色,然后资源通过这个赋予资源权限的角色来访问对应资源;
-
还有就是操作者访问服务资源,需要分配不同资源的读写权限,可以根据公司组织架构来管理每个员工的权限使用范围,非常方便;
角色权限设置规则如下:IAM身份设置用户组 dev, op, biz user,后续 细分在按组织部门进行建组;
- op: 理论上构建完一组资源可以分配对应权限策略角色Role,给予服务资源的运维管理操作;
- dev: 只有使用开发资源权限,比如lambda编辑发布权限,数据库读写权限,而非管理删除权限策略Role;
- bizUser: 业务操作者,大部分只有读权限策略Role,没有写操作权限;
- admin: 管理员,Administrator权限,可以访问任何资源
还有是创建的应用是给外部服务用户使用,比如Web,移动端用户,通过Amazon Cognito来注册登录管理用户,也可以通过第三方登录进行身份验证;两个主要组件是用户池和身份池:用户池是为应用程序提供注册和登录选项的用户目录,身份池授予用户访问其他 AWS 服务的权限。请参考Amazon Cognito常见场景。
场景
CASE 实时异常事件报警展现
打点事件数据:(实体数据,通过同步实体表数据流进行关联jion操作)
field | type | desc |
---|---|---|
eventId | string | 用户行为事件id |
action | string | 统一定义的事件动作,比如 浏览文档:viewDoc |
userId | string | 触发事件的用户id |
createdAt | string | 触发事件时间 |
objectId | string | 操作对象id, 比如文档id |
bizId | string | 所属业务id |
errorMsg | string | 错误信息 |
实时过滤出异常事件KDA SQL
-- ** Continuous Filter **
-- Performs a continuous filter based on a WHERE condition.
-- .----------. .----------. .----------.
-- | SOURCE | | INSERT | | DESTIN. |
-- Source-->| STREAM |-->| & SELECT |-->| STREAM |-->Destination
-- | | | (PUMP) | | |
-- '----------' '----------' '----------'
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
-- reference:
-- https://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/sqlref/analytics-sql-reference.html
-- https://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/streaming-sql-concepts.html
-- https://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/sqlref/kinesis-analytics-sqlref.pdf
-- abnormality event stream
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(
"eventId" varchar(64),
"action" varchar(256),
"userId" varchar(64),
"objectId" varchar(64),
"bizId" varchar(64),
"errorMsg" varchar(1024),
"createdAt" varchar(32)
);
-- Filter errorMsg like panic/error pump
CREATE OR REPLACE PUMP "ERROR_PANIC_STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "eventId", "action", "userId", "objectId", "bizId", "errorMsg","createdAt"
FROM "SOURCE_SQL_STREAM_001"
WHERE "errorMsg" LIKE '%[PANIC]%'
or "errorMsg" LIKE '%[panic]%'
or "errorMsg" LIKE '%[ERROR]%'
or "errorMsg" LIKE '%[error]%';
每1分钟warn数目超过10次的SQL(滚动窗口)
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(
"eventId" varchar(64),
"action" varchar(256),
"userId" varchar(64),
"objectId" varchar(64),
"bizId" varchar(64),
"errorMsg" varchar(1024),
"createAt" varchar(32),
"INGREST_ROW_TIME" varchar(32),
"APPROXIMATE_ARRIVAL_TIME" varchar(32)
);
-- Filter errorMsg like warning pump
-- Aggregation with time window(u can use stagger windows,tumbling windows, sliding windows)
-- use tumbling windows for this case
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "eventId", "userId", "objectId", "bizId", "createAt"
"errorMsg","action",
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "INGREST_ROW_TIME",
STEP("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME BY INTERVAL '60' SECOND) AS "APPROXIMATE_ARRIVAL_TIME",
-- STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND) AS "EVENT_TIME",
COUNT(*) AS "action_warn_count"
FROM "SOURCE_SQL_STREAM_001"
WHERE "errorMsg" LIKE "% WARNNING %" or "errorMsg" LIKE "% warnning %"
GROUP BY "action",
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
STEP("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME BY INTERVAL '60' SECOND)
-- STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND)
Having "action_warn_count" >= 10;
服务构建
基于以上服务搭建,需要用户去aws云平台上点击,配置使用, 特别是构建表,以及数据传输,属性和安全访问权限角色的管理,随着业务复杂度变高,基础服务也随之增多,可配置化的东西越来越多,会带来灾难性的后果,最终变得不可控,人力管理运维成本增加;aws平台提供了AWS CloudFormation 允许您通过将基础设施视为代码来建模、预置和管理 AWS 和第三方资源。即IaC,也是Devops经常所做的事情,可以使用相关IaC工具来自动化构建一个系统,常用的场景是CI/CD支持快速变化的业务需求开发;AWS一直提供基础服务的配置API,有相关的SDK可供使用,也有aws
工具来操作这些基础服务资源; 后面提供了一种开源软件开发框架AWS Cloud Development Kit (AWS CDK),可使用熟悉的编程语言来定义云应用程序资源;将云上硬件资源可编程化,可以很方便的实现自动化运维管理,充分利用云上的资源来组装construct成一个模块栈stack, 各个模块栈最终合成一个落地解决方案,方便开箱即用(安装软件一样),降低云构建的复杂性;cdk construct分3个层次的封装,L1是最低级别初始封装,直接对应CloudFormation配置模版文件映射,称之为CFN 资源,必须配置每一项属性,需要深入了解资源模型的详细信息;L2是是对了L1的组合封装,使用资源属性默认值,比如new VPC;L3则是一种模式(pattern),通过多种资源组合成一个常见资源架构,比如 new Fargate无服务化容器集群;具体参考见:constructs
通过cdk workshop 大概花半天时间学习这里的demo就可以试着搭建相关的服务, 也有更多的awesome workshop可供参考和学习的,并且 aws blog builders' lib也会有很多相关的解决方案提供学习;当然需要很好的架构aws服务,需要多落地实践,深入服务细节(查看帮助文档docs),结合需求,才能构建一个相对完美的解决方案;当然前期生产落地需要使用CDK来进行架构推理。
这里结合上述需求,使用CDK来搭建一些stack, 方便数据流分析管道的组装,后续也方便与其他contstucts进行组装;主要是分为以下stack:
-
CDK-Workshop-Lambda-KDS-stack:
APIGateway->lambda(put KDS record & hit counter in dynamodb)->lambda(hello) dynamotableviewer
-
DMS-KDS-stack
-
KDS-KDA-sql-Lambda-Dynamodb-stack: KDS->KDA->lambda->dynamodb dynamotableviewer
-
KDS-KDF-S3-stack: KDS->KDF->S3
-
KDS-KDA-flink-OpenSearch-stack
-
KDS-KDA-flink-KDS-stack: eg: for near-realtime-warehouse, make a pipeline (ODS->DWD->DWM->DWS) sink to Redshift; like this tencent news Pipeline pattern
-
KDS-KDF-Redshift-stack:
-
Redshift-QuickSight-stack
-
S3-Glue-Athena-stack
这里主要部署CDK-Workshop-Lambda-KDS-stack, KDS-KDA-sql-Lambda-Dynamodb-stack 和KDS-KDF-S3-stack 搭建SQL流式处理用户行为数据,具体见代码;其他stack可以后续进行扩展进行构建,推理整体基础设施架构。
构建
首先需要安装CDK, 具体查看入门教程,执行如下命令:
# download code to start
git clone https://github.com/weedge/user-behavior-analytics-cdk.git && cd user-behavior-analytics-cdk
# tips: cdk load cdk.json + cdk.context.json to run by js on node
# list stacks
cdk ls
# deploy KDS-KDA-sql-Lambda-DynamoDB-stack with KDS-KDF-S3-stack(need created kinesis data stream)
cdk deploy KDS-KDA-sql-Lambda-DynamoDB-stack
# deploy CDK-Workshop-Lambda-KDS-stack for hit event stream; lambda func put record to KDS, dependcy KDS
cdk deploy CDK-Workshop-Lambda-KDS-stack
部署完之后,会输出kinesis数据流的名称以及用于查看数据结果地址;
开始写入测试数据,需要使用python3, 使用pip3 安装依赖包
# init python virtural env
python3 -m venv .venv && source .venv/bin/activate
# install boto3 faker
pip3 install boto3 faker
# run test script, wait KDA run, put record to KDS
python3 src/scripts/producer-kds-test.py
运行测试脚本,输入region地域名称,比如us-east-1, 等待启动后,输入数据流名称,开始发送数据;
每1秒发一次数据写入KDS中,发了10次含有错误事件,发了10次随机事件,总共20条;
也可以使用aws提供KDS数据生成器:https://github.com/awslabs/amazon-kinesis-data-generator
从刚才部署输出结果地址查看含有错误的数据已经有10条展现出来(数据获取式前端每隔几秒轮训获取api数据,实时展现可通过API Gateway WebSocket 来支持);历史数据可以在KDF配置的S3目标中点击查看,原始数据可以下载gz包进行解压查看;
通过使用提供给前端访问的事件api(api地址在部署CDK-Workshop-Lambda-KDS-stack后输出的访问地址)来写入异常[error]
数据
curl -XPOST -iv https://{APIGateway}.execute-api.{region}.amazonaws.com/prod/event -d '{"eventId":"1-1-1","bizId":"123123","objectId":"123","action":"test","errorMsg":"[error]","userId":"1231321"}'
通过刚才部署KDS-KDA-sql-Lambda-DynamoDB-stack的结果地址可以 查看异常数据已经实时写入。
最后将部署资源清除,依赖删除(和卸载软件一样)。
# destroy resources with dependent resources
cdk destroy KDS-KDF-S3-stack
cdk destroy KDS-KDA-sql-Lambda-DynamoDB-stack
cdk destroy CDK-Workshop-Lambda-KDS-stack
# or cdk destroy --all
cdk destroy --all
代码结构
├── cmd -- golang cmd bin dir
├── docs -- help doc
├── infra -- infrastructures stack
│ └── lib -- cdk constuct in stack
├── src -- source code to run
│ ├── kinesis-analytics-pyflink -- KDA python scripts use flink python api
│ ├── kinesis-analytics-sql -- KDA sql
│ ├── lambda -- js,python,golang lambda func
│ ├── redshift-sql -- redshift sql
│ └── scripts -- local run test code by use aws sdk
├── test -- test cdk logic
参考
sam cli local debug lambda, The Complete AWS SAM workshop, AWS re:Invent 2022 - Best practices for advanced serverless developers
streaming cdk: https://github.com/aws-samples/streaming-solution-aws-cdk
kds and msk cdk: https://github.com/aws-solutions/streaming-data-solution-for-amazon-kinesis-and-amazon-msk
Redshift cdk : https://github.com/miztiik/redshift-demo
Glue cdk: https://github.com/aws-samples/glue-workflow-aws-cdk
opensearch cdk: https://www.luminis.eu/blog/cloud-en/deploying-a-secure-aws-elasticsearch-cluster-using-cdk/
mysql dms cdk: https://aws.amazon.com/cn/blogs/database/accelerate-data-migration-using-aws-dms-and-aws-cdk/
aurora mysql dms kds opensearch cdk: https://github.com/aws-samples/aws-dms-cdc-data-pipeline.git
kda-zeppelin-flink-py: https://aws.amazon.com/cn/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/
Implementing Microservices on AWS