Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Sign in
Toggle navigation
W
warehouse
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
侯双强
warehouse
Commits
2d7aaa52
Commit
2d7aaa52
authored
Sep 16, 2021
by
侯双强
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
内容订单数据
parent
193a2495
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
586 additions
and
14 deletions
+586
-14
ads_user_content_order_records.py
edw/ads/user/ads_user_content_order_records.py
+63
-0
ads_user_learn_course.py
edw/ads/user/ads_user_learn_course.py
+15
-12
dwd_user_content_order.py
edw/dwd/user/dwd_user_content_order.py
+103
-0
dws_user_content_order.py
edw/dws/user/dws_user_content_order.py
+366
-0
scheduled_tasks.py
edw/tasks/scheduled_tasks.py
+39
-2
No files found.
edw/ads/user/ads_user_content_order_records.py
0 → 100644
View file @
2d7aaa52
# -*- coding: utf-8 -*-
'''
内容订单数据(全量同步,订单量多了,再用增量),定时任务,每10分钟运行一次
注意:在ads层需要隐藏手机号
'''
import
logging
import
sys
from
common.mysql_uitl
import
fetch_all
,
save_result
logging
.
basicConfig
(
format
=
"
%(asctime)
s
%(name)
s:
%(levelname)
s:
%(message)
s"
,
datefmt
=
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
level
=
logging
.
INFO
)
file_name
=
sys
.
argv
[
0
]
def
ads_user_content_order_records
():
order_records_dict
=
query_user_content_order
()
save_result
(
'tamp_data_ads'
,
'ads_user_content_order_records'
,
order_records_dict
,
file_name
)
def
query_user_content_order
():
function_name
=
sys
.
_getframe
()
.
f_code
.
co_name
logging
.
info
(
f
'{function_name} start'
)
sql
=
'''
select o.id
,o.order_id
,o.data_dt
,o.user_id
,o.real_name
,o.user_name
,o.nickname
,o.team_id
,o.res_id
,o.res_name
,o.payment_amount
,case when (c.ref_type = 306) then '活动领取'
when (c.ref_type = 320) then '金刚区领券中心领取'
when (c.ref_type = 321) then '课程资源页领取'
when (c.ref_type = 322) then 'h5页面领取'
else ''
end as coupon_collection -- 优惠券领取位置
,round(coupon_amount / 100,2) as coupon_amount
,o.ab_pay_mode
,o.pay_mode
,o.res_type
,o.res_type_name
,o.order_type
,o.create_time
,o.pay_time
,o.complete_time
from tamp_data_dws.dws_user_content_order o
left join tamp_pay.coupon_distribute c
on o.order_id = c.order_id
where o.order_status = 'SUCCESS'
order by o.data_dt desc, o.user_id desc
'''
course_order_dict
=
fetch_all
(
sql
,
None
)
logging
.
info
(
f
'{function_name} success'
)
return
course_order_dict
if
__name__
==
'__main__'
:
ads_user_content_order_records
()
\ No newline at end of file
edw/ads/user/ads_user_learn_course.py
View file @
2d7aaa52
...
...
@@ -21,22 +21,20 @@ file_name = sys.argv[0]
def
ads_user_learn_course
(
data_dt
):
single_course_invite_dict
=
query_single_course_invite_people_record
(
data_dt
)
course_invite_dict
=
query_course_invite_people_record
(
data_dt
)
# 合并单节课程和课程包邀请人数,没有严格去重邀请人数,只是对单节课程,邀请人数做了去重,课程包邀请人数做了去重
invite_people_record_result_dict
=
merge_course_invite_people
(
single_course_invite_dict
,
course_invite_dict
)
learn_course_details_dict
=
query_user_learn_course_details
(
data_dt
)
merge_learn_course
(
learn_course_details_dict
,
invite_people_record_result_dict
)
# 保存明細数据
save_result
(
'tamp_data_ads'
,
'ads_user_learn_course_details'
,
learn_course_details_dict
,
file_name
)
single_course_invite_summary_dict
=
query_single_course_invite_people_summary
()
course_invite_summary_dict
=
query_course_invite_people_summary
()
invite_people_summary_result_dict
=
merge_course_invite_people
(
single_course_invite_summary_dict
,
course_invite_summary_dict
)
learn_course_details_dict
=
query_user_learn_course_details
(
data_dt
)
learn_course_summary_dict
=
query_user_learn_course_summary
()
#
merge_learn_course
(
learn_course_details_dict
,
invite_people_record_result_dict
)
merge_learn_course
(
learn_course_summary_dict
,
invite_people_summary_result_dict
)
# 保存数据
save_result
(
'tamp_data_ads'
,
'ads_user_learn_course_details'
,
learn_course_details_dict
,
file_name
)
# # 保存汇总数据
save_result
(
'tamp_data_ads'
,
'ads_user_learn_course_summary'
,
learn_course_summary_dict
,
file_name
)
...
...
@@ -186,10 +184,10 @@ def query_user_learn_course_details(data_dt):
,t.team_id
,p.course_id
,p.course_name
,
q.course_type
,
coalesce(q.course_type, '免费') as course_type
,p.total_dur
,p.learn_dur
,p.play_rate
,p.play_rate
,p.learn_classes
,p.total_classes
,p.online_time
...
...
@@ -209,6 +207,7 @@ def query_user_learn_course_details(data_dt):
when ab_pay_mode = '99' and ab_order_type = 0 then '系统赠送'
when ab_pay_mode = '99' and ab_order_type = 2 then '好友赠送'
end course_type
,count(1)
from tamp_user.order_flow
where deletetag = '0'
and ab_type = '300'
...
...
@@ -216,6 +215,7 @@ def query_user_learn_course_details(data_dt):
and ab_status = 'SUCCESS'
and ab_proid is not null
and group_buy_status in (1, 2)
group by createby, ab_proid, course_type # 排除用户多次下单(同一个课程,存在多次下单的情况)
) q
on p.user_id = q.user_id
and p.course_id = q.course_id
...
...
@@ -237,7 +237,8 @@ def query_user_learn_course_summary():
,q.nickname
,q.team_id
,p.course_id
,p.course_id
,p.course_name
,coalesce(q.course_type, '免费') as course_type
,p.learn_dur
,p.total_dur
,if(p.play_rate >=100.00, 100.00, p.play_rate) as play_rate
...
...
@@ -266,6 +267,7 @@ def query_user_learn_course_summary():
when ab_pay_mode = '99' and ab_order_type = 0 then '系统赠送'
when ab_pay_mode = '99' and ab_order_type = 2 then '好友赠送'
end course_type
,count(1)
from tamp_user.order_flow
where deletetag = '0'
and ab_type = '300'
...
...
@@ -273,6 +275,7 @@ def query_user_learn_course_summary():
and ab_status = 'SUCCESS'
and ab_proid is not null
and group_buy_status in (1, 2)
group by createby, ab_proid, course_type # 排除用户多次下单(同一个课程,存在多次下单的情况)
) t
on p.user_id = t.user_id
and p.course_id = t.course_id
...
...
@@ -350,7 +353,7 @@ def merge_learn_course(learn_course_dict, invite_dict):
if
__name__
==
'__main__'
:
import
datetime
begin
=
datetime
.
date
(
2021
,
9
,
14
)
begin
=
datetime
.
date
(
2021
,
4
,
14
)
end
=
datetime
.
date
(
2021
,
9
,
14
)
data_dt
=
begin
delta
=
datetime
.
timedelta
(
days
=
1
)
...
...
edw/dwd/user/dwd_user_content_order.py
0 → 100644
View file @
2d7aaa52
# -*- coding: utf-8 -*-
'''
内容订单数据(全量同步,订单量多了,再用增量),定时任务,每10分钟运行一次
'''
import
logging
import
os
import
sys
from
common.file_uitil
import
get_file_path
,
get_file_name
from
common.mysql_uitl
import
fetch_all
,
insert_batch
,
save_result
logging
.
basicConfig
(
format
=
"
%(asctime)
s
%(name)
s:
%(levelname)
s:
%(message)
s"
,
datefmt
=
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
level
=
logging
.
INFO
)
# file_path = get_file_path()
file_name
=
sys
.
argv
[
0
]
def
dwd_user_content_order
():
content_order_dict
=
query_dwd_user_content_order
()
save_result
(
'tamp_data_dwd'
,
'dwd_user_content_order'
,
content_order_dict
,
file_name
)
def
query_dwd_user_content_order
():
function_name
=
sys
.
_getframe
()
.
f_code
.
co_name
logging
.
info
(
f
'{function_name} start'
)
sql
=
'''
select date_format(o.createtime, '
%
Y-
%
m-
%
d') as data_dt -- 下单日期
,o.createby as user_id
,u.real_name
,u.user_name
,u.nickname
,u.telephone
,u.team_id
,o.id
,o.ab_ordernum as order_id -- 订单id
,case when o.ab_pay_mode in ('1','99') then 0
when o.ab_pay_mode = '2' then coalesce(ab_score_deduct,ab_account_deduct,0)
when o.ab_pay_mode = '3' then (ab_price / 100)
when o.ab_pay_mode in ('4','5','6','7','8') then (ab_payment / 100)
end as payment_amount
,o.ab_pay_mode
,case when o.ab_pay_mode = '1' then '免费'
when o.ab_pay_mode = '2' then '积分支付'
when o.ab_pay_mode = '3' then '现金支付'
when o.ab_pay_mode = '4' then '探普贝支付'
when o.ab_pay_mode = '5' then '微信支付'
when o.ab_pay_mode = '6' then '支付宝支付'
when o.ab_pay_mode = '7' then '苹果支付'
when o.ab_pay_mode = '8' then '微信支付(公众号)'
when o.ab_pay_mode = '99' and ab_order_type = 0 then '系统赠送'
when o.ab_pay_mode = '99' and ab_order_type = 2 then '好友赠送'
end as pay_mode
,o.ab_type as res_type
,case when o.ab_type = '1' then '购买栏目'
when o.ab_type = '3' then '购买直播'
when o.ab_type = '4' then '购买老版视频课程'
when o.ab_type = '5' then '购买老版音频课程'
when o.ab_type = '6' then '购买探普贝'
when o.ab_type = '7' then '购买课件'
when o.ab_type = '300' then '购买新版课程'
when o.ab_type = '305' then '购买新版课程课件'
when o.ab_type = '323' then '线下活动报名'
end res_type_name
,case when o.ab_order_type = 0 then '普通订单'
when o.ab_order_type = 1 then '赠送型订单'
when o.ab_order_type = 2 then '领取型订单'
when o.ab_order_type = 3 then '拼团型订单'
end as order_type -- 订单类型 0正常订单 1赠送型订单 2领取型订单 3拼团型订单,
,o.ab_proid as res_id
,o.ab_status as order_status
-- ,o.ab_order_type -- 订单类型 0正常订单 1赠送型订单 2领取型订单 3拼团型订单,
,o.present_status -- (ab_order_type为1时有值)赠送状态 0赠送中 1赠送完成
,o.group_buy_status -- (ab_order_type为3时有值)赠送状态 0拼团失败 1拼团成功 2拼团进行中
,o.order_number -- 订单份数
,o.createtime as create_time -- 订单创建时间
,o.pay_time -- 订单支付时间
,o.complete_time -- 订单完成时间
from tamp_user.order_flow o
left join tamp_analysis.user_info_view u
on o.createby = u.user_id
where o.deletetag = '0'
and o.ab_proid <> ''
and o.ab_proid is not null
and o.is_sand_box <> 1
and o.group_buy_status in (1, 2)
order by data_dt desc
'''
content_order_dict
=
fetch_all
(
sql
,
None
)
logging
.
info
(
f
'{function_name} success'
)
return
content_order_dict
if
__name__
==
'__main__'
:
dwd_user_content_order
()
# import datetime
# begin = datetime.date(2021, 9, 14)
# end = datetime.date(2021, 9, 14)
# data_dt = begin
# delta = datetime.timedelta(days=1)
# while data_dt <= end:
# print(data_dt.strftime("%Y-%m-%d"))
# dwd_user_content_order(data_dt)
# data_dt += delta
\ No newline at end of file
edw/dws/user/dws_user_content_order.py
0 → 100644
View file @
2d7aaa52
This diff is collapsed.
Click to expand it.
edw/tasks/scheduled_tasks.py
View file @
2d7aaa52
...
...
@@ -2,6 +2,9 @@
import
logging
import
sys
from
edw.ads.user.ads_user_content_order_records
import
ads_user_content_order_records
from
edw.dwd.user.dwd_user_content_order
import
dwd_user_content_order
from
edw.dws.user.dws_user_content_order
import
dws_user_content_order
'''
调度任务,分层ods,dwd,dws,ads 分层调度
...
...
@@ -36,9 +39,37 @@ from edw.ods.user.ods_users_info import ods_users_info
logging
.
basicConfig
(
format
=
"
%(asctime)
s
%(name)
s:
%(levelname)
s:
%(message)
s"
,
datefmt
=
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
level
=
logging
.
INFO
)
file_name
=
sys
.
argv
[
0
]
#按分钟定时
run_minute_time
=
get_run_time
(
30
)
# 滞后30分钟,去取时间。时间取值范围有全量,还有按天计算的(按天计算时,时间范围取值都是从0点~24点)
data_minute_dt
=
run_minute_time
[
0
:
10
]
# 延迟2.5 小时,去取数,因为定时任务是每隔2个小时运行
run_time
=
get_run_time
(
60
*
2.5
)
data_dt
=
run_time
[
0
:
10
]
run_hour_time
=
get_run_time
(
60
*
2.5
)
data_dt
=
run_hour_time
[
0
:
10
]
def
dwd_task_minute
():
function_name
=
sys
.
_getframe
()
.
f_code
.
co_name
logging
.
info
(
f
'{function_name} start'
)
# 内容订单数据
dwd_user_content_order
()
logging
.
info
(
f
'{function_name} end'
)
def
dws_task_minute
():
function_name
=
sys
.
_getframe
()
.
f_code
.
co_name
logging
.
info
(
f
'{function_name} start'
)
# 内容订单数据
dws_user_content_order
()
logging
.
info
(
f
'{function_name} end'
)
def
ads_task_minute
():
function_name
=
sys
.
_getframe
()
.
f_code
.
co_name
logging
.
info
(
f
'{function_name} start'
)
# 内容订单数据
ads_user_content_order_records
()
logging
.
info
(
f
'{function_name} end'
)
def
ods_task_hour
():
...
...
@@ -89,6 +120,12 @@ def ads_task_hour():
if
__name__
==
'__main__'
:
scheduler
=
BlockingScheduler
()
# 按分定时调度
scheduler
.
add_job
(
dwd_task_minute
,
"interval"
,
minutes
=
10
)
scheduler
.
add_job
(
dws_task_minute
,
"interval"
,
minutes
=
10
)
scheduler
.
add_job
(
ads_task_minute
,
"interval"
,
minutes
=
10
)
# 两个小时定时调度
# scheduler.add_job(ods_task_hour, "interval", hours=2)
scheduler
.
add_job
(
dwd_task_hour
,
"interval"
,
hours
=
2
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment