Commit 99547ea3 authored by 侯双强's avatar 侯双强

新增

parent 8e5e23e6
......@@ -154,43 +154,9 @@ def save_result(database, table, result_dict, file_name):
def save_etl_log(database, table, data_dt, row, status, task_file, now_time):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = f'''replace into tamp_data_dwd.dwd_etl_log (data_dt,`database`,`table`,`rows`,`status`,task_file,run_time)
values('{data_dt}', '{database}', '{table}', {row}, '{status}', '{task_file}', '{now_time}')'''
insert(sql, None)
# def upsert(database, table, result_dict, file_name):
# '''
# 更新或插入,存在则更新,不存在,则插入
# :param database:
# :param table:
# :param result_dict:
# :param file_name:
# :return:
# '''
# if result_dict:
# result_table = database + '.' + table
# logging.info(f'upsert {result_table} start')
# row = 0
# fields = None
# place_holder = None
# values = list()
# for i in result_dict:
# row = row + 1
# fields = ','.join([f"`{k}`" for k in i.keys()])
# place_holder = ','.join(["%s" for _ in i.keys()])
# values.append(tuple(i.values()))
#
#
# sql = f'''replace into {result_table} ( {fields} ) values ( {place_holder} )'''
# rs = insert_batch(sql, values)
# if rs == row or rs == 2 * row: # 因为这里用的是replace
# logging.info(f'upsert {result_table} success {row}')
# else:
# logging.error(f'upsert {result_table} error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')
def rows_to_dict_list(cursor):
columns = [i[0] for i in cursor.description]
return [dict(zip(columns, row)) for row in cursor]
\ No newline at end of file
logging.info(f'{function_name} success')
......@@ -8,7 +8,7 @@ import json
import logging
import os
import sys
import time
import datetime
import requests
......@@ -74,7 +74,6 @@ def query_app_install(data_dt):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 9, 19)
end = datetime.date(2021, 9, 22)
data_dt = begin
......
......@@ -108,7 +108,7 @@ def query_level_dau(data_dt):
) t
on p.data_dt = t.data_dt
'''
level_dau_dict = fetch_all(sql, data_dt)
level_dau_dict = fetch_all(sql, (data_dt, data_dt, data_dt))
logging.info(f'{function_name} success')
return level_dau_dict
......@@ -188,7 +188,7 @@ def query_level_wau(data_dt):
) t
on p.run_dt = t.data_dt
'''
platform_wau_dict = fetch_all(sql, (start_date, end_date, start_date, end_date))
platform_wau_dict = fetch_all(sql, (start_date, end_date, data_dt, start_date, end_date, start_date, end_date, data_dt, end_date))
logging.info(f'{function_name} success')
return platform_wau_dict
......@@ -247,7 +247,7 @@ def query_level_mau(data_dt):
select date_format(data_dt, '%%Y-%%m') as data_dt
,user_id
,max(level_grade) as level_grade
from tamp_analysis.user_login_area
from tamp_data_dwd.dwd_user_login_environment
where data_dt between %s and %s
group by date_format(data_dt, '%%Y-%%m'),user_id
) o
......@@ -268,15 +268,15 @@ def query_level_mau(data_dt):
) t
on p.run_dt = t.data_dt
'''
platform_mau_dict = fetch_all(sql, (start_date, end_date))
platform_mau_dict = fetch_all(sql, (data_dt, start_date, end_date, data_dt, data_dt))
logging.info(f'{function_name} success')
return platform_mau_dict
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 1, 1)
end = datetime.date(2021, 9, 17)
begin = datetime.date(2021, 9, 22)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -100,7 +100,7 @@ def query_single_course_invite_people_summary():
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
select p.data_dt
select max(p.data_dt) as data_dt
,p.user_id
,t.package_id as course_id
,count(distinct p.user_id) as invite_num
......@@ -117,7 +117,7 @@ def query_single_course_invite_people_summary():
) p
left join tamp_analysis.course_res_view t
on p.res_id = t.id
group by p.data_dt, p.user_id, t.package_id
group by p.user_id, t.package_id
'''
single_course_invite_dict = fetch_all(sql, None)
logging.info(f'{function_name} success')
......@@ -129,13 +129,13 @@ def query_course_invite_people_summary():
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
select data_dt
select max(data_dt) as data_dt
,source_user_id as user_id
,res_id as course_id
,count(distinct user_id) as invite_num
from tamp_data_dwd.dwd_user_visit_clues
where current_page in ('p10503', 'p10504', 'p10507')
group by data_dt,source_user_id,res_id
group by source_user_id,res_id
'''
course_invite_dict = fetch_all(sql, None)
logging.info(f'{function_name} success')
......@@ -250,6 +250,7 @@ def query_user_learn_course_summary():
,p.total_dur
,if(p.play_rate >=100.00, 100.00, p.play_rate) as play_rate
,coalesce(p.share_num, 0) as share_num
,p.online_time
from
(
select max(data_dt) as data_dt
......@@ -258,7 +259,7 @@ def query_user_learn_course_summary():
,course_name
,max(total_dur) as total_dur # 后面可能有新课程增加
,sum(learn_dur) as learn_dur
,round(sum(learn_dur) / total_dur * 100, 2) as play_rate
,round(sum(learn_dur) / max(total_dur) * 100, 2) as play_rate
,online_time
,sum(share_num) as share_num
from tamp_data_dws.dws_user_learn_course
......@@ -360,8 +361,8 @@ def merge_learn_course(learn_course_dict, invite_dict):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 16)
begin = datetime.date(2021, 9, 17)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -8,7 +8,7 @@ import json
import logging
import os
import sys
import time
import datetime
import requests
......@@ -65,7 +65,6 @@ def get_app_install(url, app_type):
if __name__ == '__main__':
# dwd_app_install(data_dt)
import datetime
begin = datetime.date(2021, 9, 18)
end = datetime.date(2021, 9, 22)
data_dt = begin
......
......@@ -97,13 +97,12 @@ def query_dwd_user_content_order():
if __name__ == '__main__':
dwd_user_content_order()
# import datetime
# begin = datetime.date(2021, 9, 14)
# end = datetime.date(2021, 9, 14)
# data_dt = begin
# delta = datetime.timedelta(days=1)
# while data_dt <= end:
# print(data_dt.strftime("%Y-%m-%d"))
# dwd_user_content_order(data_dt)
# data_dt += delta
\ No newline at end of file
import datetime
begin = datetime.date(2021, 9, 14)
end = datetime.date(2021, 9, 14)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dwd_user_content_order(data_dt)
data_dt += delta
\ No newline at end of file
......@@ -104,8 +104,8 @@ def save_dwd_user_learn_course(ret):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 4, 15)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -8,7 +8,7 @@ import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
......@@ -23,9 +23,9 @@ def dwd_user_login_area(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
login_area_dict = query_user_login_area(start_time, end_time)
row = save_result('tamp_data_dwd', 'save_dwd_user_login_area', login_area_dict, file_name)
row = save_result('tamp_data_dwd', 'dwd_user_login_area', login_area_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'save_dwd_user_login_area', data_dt, row, 'done', task_file, now_time)
save_etl_log('tamp_data_dwd', 'dwd_user_login_area', data_dt, row, 'done', task_file, now_time)
def query_user_login_area(start_time, end_time):
......@@ -95,7 +95,7 @@ if __name__ == '__main__':
# dwd_user_login_environment(data_dt)
import datetime
begin = datetime.date(2021, 9, 17)
end = datetime.date(2021, 9, 17)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -8,7 +8,6 @@ import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
......@@ -38,6 +37,27 @@ def query_user_login_env(start_time, end_time):
,coalesce(t.user_name, '') as user_name
,coalesce(t.nickname, '') as nickname
,t.team_id
,case when p.data_dt >= date_format(t.senior_adviser_time, '%%Y-%%m-%%d') then 40
when p.data_dt < date_format(t.senior_adviser_time, '%%Y-%%m-%%d')
and p.data_dt >= date_format(t.advisor_time, '%%Y-%%m-%%d') then 30
when t.senior_adviser_time is null and p.data_dt >= date_format(t.advisor_time, '%%Y-%%m-%%d') then 30
when p.data_dt < date_format(t.advisor_time, '%%Y-%%m-%%d')
and p.data_dt >= date_format(t.practitioner_time, '%%Y-%%m-%%d') then 20
when t.advisor_time is null and p.data_dt >= date_format(t.practitioner_time, '%%Y-%%m-%%d') then 20
when p.data_dt < date_format(t.practitioner_time, '%%Y-%%m-%%d')
and p.data_dt >= date_format(t.investor_time, '%%Y-%%m-%%d') then 10
when t.practitioner_time is null and p.data_dt >= date_format(t.investor_time, '%%Y-%%m-%%d') then 10
when p.data_dt < date_format(t.practitioner_time, '%%Y-%%m-%%d')
and p.data_dt >= date_format(t.register_time, '%%Y-%%m-%%d') then 1
when p.data_dt < date_format(t.investor_time, '%%Y-%%m-%%d')
and p.data_dt >= date_format(t.register_time, '%%Y-%%m-%%d') then 1
when t.practitioner_time is null and investor_time is null
and p.data_dt >= date_format(t.register_time, '%%Y-%%m-%%d') then 1
when p.data_dt < date_format(t.register_time, '%%Y-%%m-%%d')
and p.data_dt >= date_format(t.create_time, '%%Y-%%m-%%d') then 0
when t.register_time is null and p.data_dt >= date_format(t.create_time, '%%Y-%%m-%%d') then 0
else 0
end as level_grade
,p.env
,p.env_name
,p.start_time
......@@ -97,8 +117,8 @@ def save_dwd_user_login_env(ret):
if __name__ == '__main__':
# dwd_user_login_environment(data_dt)
import datetime
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 16)
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -6,7 +6,7 @@
import logging
import os
import sys
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.mysql_uitl import fetch_all, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
......
......@@ -8,8 +8,7 @@ import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.mysql_uitl import fetch_all, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
......@@ -23,7 +22,6 @@ def dwd_user_login_phone_mode(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
login_phone_dict = query_user_login_phone_mode(start_time, end_time)
save_dwd_user_login_phone_mode(login_phone_dict)
row = save_result('tamp_data_dwd', 'dwd_user_login_phone_mode', login_phone_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_login_phone_mode', data_dt, row, 'done', task_file, now_time)
......@@ -69,31 +67,10 @@ def query_user_login_phone_mode(start_time, end_time):
return login_phone_dict
def save_dwd_user_login_phone_mode(ret):
if ret:
logging.info('save_dwd_user_login_phone_mode start')
row = 0
fields = None
place_holder = None
values = list()
for i in ret:
row = row + 1
fields = ','.join([f"`{k}`" for k in i.keys()])
place_holder = ','.join(["%s" for _ in i.keys()])
values.append(tuple(i.values()))
sql = f'''replace into tamp_data_dwd.dwd_user_login_phone_mode ( {fields} ) values ( {place_holder} )'''
rs = insert_batch(sql, values)
if rs == row or rs == 2 * row: # 因为这里用的是replace
logging.info(f'save_dwd_user_login_phone_mode success {row}')
else:
logging.error(f'save_dwd_user_login_phone_mode error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 22)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -154,8 +154,8 @@ def save_dwd_user_share_event(ret):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -327,4 +327,12 @@ def query_studio_add_self_article():
if __name__ == '__main__':
dwd_user_studio_add_content()
import datetime
begin = datetime.date(2021, 9, 22)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dwd_user_studio_add_content(data_dt)
data_dt += delta
......@@ -152,8 +152,8 @@ def save_dwd_user_visit_clues(ret):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -22,9 +22,9 @@ def dwd_user_watch_live(data_dt):
end_time = str(data_dt) + ' 23:59:59'
user_watch_dict = query_dwd_user_watch_live(start_time, end_time)
# save_dwd_user_watch_live(user_watch_dict)
row = save_result('tamp_data_dwd', 'dwd_user_watch', user_watch_dict, file_name)
row = save_result('tamp_data_dwd', 'dwd_user_watch_live', user_watch_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_watch', data_dt, row, 'done', task_file, now_time)
save_etl_log('tamp_data_dwd', 'dwd_user_watch_live', data_dt, row, 'done', task_file, now_time)
def query_dwd_user_watch_live(start_time, end_time):
......@@ -105,8 +105,8 @@ def save_dwd_user_watch_live(ret):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -77,7 +77,7 @@ def query_dwd_user_watch_short_video(start_time, end_time):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 21)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -65,8 +65,8 @@ def query_dws_user_browse_fund(data_dt):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -379,4 +379,13 @@ def query_dws_offline_activity_order():
if __name__ == '__main__':
dws_user_content_order()
import datetime
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dws_user_content_order(data_dt)
data_dt += delta
......@@ -79,4 +79,13 @@ def save_dws_user_learn_total_dur(ret):
if __name__ == '__main__':
dws_user_learn_total_dur()
import datetime
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dws_user_learn_total_dur(data_dt)
data_dt += delta
......@@ -112,4 +112,12 @@ def update_dws_user_recent_login_env(ret):
if __name__ == '__main__':
dws_user_login_environment()
import datetime
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dws_user_login_environment(data_dt)
data_dt += delta
......@@ -73,4 +73,12 @@ def save_dws_user_login_phone_mode(ret):
if __name__ == '__main__':
dws_user_login_phone_mode()
import datetime
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dws_user_login_phone_mode(data_dt)
data_dt += delta
......@@ -8,9 +8,9 @@ import json
import logging
import os
import sys
import datetime
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from datetime import datetime
from common.time_util import YMDHMS_FORMAT, now_str
......@@ -108,9 +108,8 @@ def save_dws_user_login_area(ret):
if __name__ == '__main__':
# dws_user_login_area()
import datetime
begin = datetime.date(2021, 9, 17)
end = datetime.date(2021, 9, 18)
begin = datetime.date(2021, 9, 19)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -106,8 +106,8 @@ def merge_short_video_dict(watch_short_video_dict, share_short_video_dict):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -60,7 +60,7 @@ run_hour_time = get_run_time(60 * 2.5)
data_dt = run_hour_time[0: 10]
def dwd_task_minute():
def dwd_task_minute(data_dt):
'''
按分钟定时调度任务
'''
......@@ -71,7 +71,7 @@ def dwd_task_minute():
logging.info(f'{function_name} end')
def dws_task_minute():
def dws_task_minute(data_dt):
'''
按分钟定时调度任务
'''
......@@ -82,7 +82,7 @@ def dws_task_minute():
logging.info(f'{function_name} end')
def ads_task_minute():
def ads_task_minute(data_dt):
'''
按分钟定时调度任务
'''
......@@ -93,7 +93,7 @@ def ads_task_minute():
logging.info(f'{function_name} end')
def ods_task_hour():
def ods_task_hour(data_dt):
'''
按小时定时调度任务
'''
......@@ -105,7 +105,7 @@ def ods_task_hour():
def dwd_task_hour():
def dwd_task_hour(data_dt):
'''
按小时定时调度任务
'''
......@@ -125,7 +125,7 @@ def dwd_task_hour():
logging.info(f'{function_name} end')
def dws_task_hour():
def dws_task_hour(data_dt):
'''
按小时定时调度任务
'''
......@@ -143,7 +143,7 @@ def dws_task_hour():
logging.info(f'{function_name} end')
def ads_task_hour():
def ads_task_hour(data_dt):
'''
按小时定时调度任务
'''
......@@ -159,7 +159,7 @@ def ads_task_hour():
def dwd_task_day():
def dwd_task_day(data_dt):
'''
按天定时调度任务
'''
......@@ -172,7 +172,7 @@ def dwd_task_day():
logging.info(f'{function_name} end')
def dws_task_day():
def dws_task_day(data_dt):
'''
按天定时调度任务
'''
......@@ -182,7 +182,7 @@ def dws_task_day():
logging.info(f'{function_name} end')
def ads_task_day():
def ads_task_day(data_dt):
'''
按天定时调度任务
'''
......@@ -194,22 +194,49 @@ def ads_task_day():
logging.info(f'{function_name} end')
def minute_task():
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
run_time = get_run_time(10) # 10 分钟前的时间
data_dt = run_time[0: 10]
dwd_task_minute(data_dt)
dws_task_minute(data_dt)
ads_task_minute(data_dt)
logging.info(f'{function_name} end')
def hour_task():
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
run_time = get_run_time(10) # 10 分钟前的时间
data_dt = run_time[0: 10]
dwd_task_hour(data_dt)
dws_task_hour(data_dt)
ads_task_hour(data_dt)
logging.info(f'{function_name} end')
def day_task():
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
run_time = get_run_time(10) # 10 分钟前的时间
data_dt = run_time[0: 10]
dwd_task_day(data_dt)
dws_task_day(data_dt)
ads_task_day(data_dt)
logging.info(f'{function_name} end')
if __name__ == '__main__':
scheduler = BlockingScheduler()
# 按分定时调度
scheduler.add_job(dwd_task_minute, "interval", minutes=10)
scheduler.add_job(dws_task_minute, "interval", minutes=10)
scheduler.add_job(ads_task_minute, "interval", minutes=10)
# 两个小时定时调度
# scheduler.add_job(ods_task_hour, "interval", hours=2)
scheduler.add_job(dwd_task_hour, "interval", minutes=30)
scheduler.add_job(dws_task_hour, "interval", minutes=30)
scheduler.add_job(ads_task_hour, "interval", minutes=30)
# 按天定时任务
scheduler.add_job(dwd_task_day, "interval", hours=1)
scheduler.add_job(dws_task_day, "interval", hours=1)
scheduler.add_job(ads_task_day, "interval", hours=1)
scheduler.add_job(minute_task, 'cron', hour='0-23', minute='*/5')
# 每两小时调度
scheduler.add_job(hour_task, 'cron', hour='*/2', minute='5')
# 按天定时任务(每天2点定时运行)
scheduler.add_job(day_task, "cron", hour='2')
scheduler.start()
# -*- coding: utf-8 -*-
import logging
import sys
from edw.dws.user.dws_user_content_order import dws_user_content_order
'''
调度任务,分层ods,dwd,dws,ads 分层调度
调度又根据业务需求,分为按小时调度和按天调度
'''
from apscheduler.schedulers.blocking import BlockingScheduler
from common.time_util import get_run_time
from edw.ads.basic.ads_app_install import ads_app_install
from edw.ads.basic.ads_platform_active import ads_platform_active
from edw.ads.basic.ads_user_level_active import ads_user_level_active
from edw.ads.user.ads_user_content_order_records import ads_user_content_order_records
from edw.ads.user.ads_user_watch_short_video import ads_user_watch_short_video
from edw.dwd.basic.dwd_app_install import dwd_app_install
from edw.dwd.user.dwd_user_content_order import dwd_user_content_order
from edw.dwd.user.dwd_user_first_login_client_time import dwd_user_first_login_client_time
from edw.dwd.user.dwd_user_login_pc_record import dwd_user_login_pc_record
from edw.dwd.user.dwd_user_studio_add_content import dwd_user_studio_add_content
from edw.dwd.user.dwd_user_watch_short_video import dwd_user_watch_short_video
from edw.dws.user.dws_user_first_login_client_time import dws_user_first_login_client_time
from edw.dws.user.dws_user_watch_short_video import dws_user_watch_short_video
from edw.ads.user.ads_user_basic_behavior import ads_user_basic_behavior
from edw.ads.user.ads_user_browse_fund import ads_user_browse_fund
from edw.ads.user.ads_user_learn_course import ads_user_learn_course
from edw.ads.user.ads_user_watch_live import ads_user_watch_live
from edw.dwd.user.dwd_user_browse_fund import dwd_user_browse_fund
from edw.dwd.user.dwd_user_learn_course import dwd_user_learn_course
from edw.dwd.user.dwd_user_login_area import dwd_user_login_area
from edw.dwd.user.dwd_user_login_environment import dwd_user_login_environment
from edw.dwd.user.dwd_user_login_phone_mode import dwd_user_login_phone_mode
from edw.dwd.user.dwd_user_share_event import dwd_user_share_event
from edw.dwd.user.dwd_user_visitor_clues import dwd_user_visitor_clues
from edw.dwd.user.dwd_user_watch_live import dwd_user_watch_live
from edw.dws.user.dws_user_browse_fund import dws_user_browse_fund
from edw.dws.user.dws_user_learn_course import dws_user_learn_course
from edw.dws.user.dws_user_learn_total_dur import dws_user_learn_total_dur
from edw.dws.user.dws_user_login_top_area import dws_user_login_top_area
from edw.dws.user.dws_user_login_environment import dws_user_login_environment
from edw.dws.user.dws_user_login_phone_mode import dws_user_login_phone_mode
from edw.dws.user.dws_user_watch_live import dws_user_watch_live
from edw.ods.user.ods_users_info import ods_users_info
from edw.dwd.user.dwd_user_community_res import dwd_user_community_res
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
# 按分钟定时
run_minute_time = get_run_time(30) # 滞后30分钟,去取时间。时间取值范围有全量,还有按天计算的(按天计算时,时间范围取值都是从0点~24点)
data_minute_dt = run_minute_time[0: 10]
# 延迟2.5 小时,去取数,因为定时任务是每隔2个小时运行
run_hour_time = get_run_time(60 * 2.5)
data_dt = run_hour_time[0: 10]
def dwd_task_minute():
'''
按分钟定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 内容订单数据
dwd_user_content_order(data_dt)
logging.info(f'{function_name} end')
def dws_task_minute():
'''
按分钟定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 内容订单数据
dws_user_content_order(data_dt)
logging.info(f'{function_name} end')
def ads_task_minute():
'''
按分钟定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 内容订单数据
ads_user_content_order_records(data_dt)
logging.info(f'{function_name} end')
def ods_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 用户信息
ods_users_info()
logging.info(f'{function_name} end')
def dwd_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
dwd_user_browse_fund(data_dt)
dwd_user_community_res(data_dt)
dwd_user_login_area(data_dt)
dwd_user_learn_course(data_dt)
dwd_user_login_environment(data_dt)
dwd_user_login_phone_mode(data_dt)
dwd_user_share_event(data_dt)
dwd_user_studio_add_content(data_dt)
dwd_user_visitor_clues(data_dt)
dwd_user_watch_live(data_dt)
dwd_user_watch_short_video(data_dt)
logging.info(f'{function_name} end')
def dws_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
dws_user_login_top_area(data_dt)
dws_user_login_phone_mode(data_dt)
dws_user_learn_total_dur(data_dt)
dws_user_first_login_client_time(data_dt)
dws_user_login_environment(data_dt)
dws_user_learn_course(data_dt)
dws_user_browse_fund(data_dt)
dws_user_watch_live(data_dt)
dws_user_watch_short_video(data_dt)
logging.info(f'{function_name} end')
def ads_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
ads_user_basic_behavior(data_dt)
ads_user_browse_fund(data_dt)
ads_user_learn_course(data_dt)
ads_user_watch_live(data_dt)
ads_user_watch_short_video(data_dt)
ads_user_content_order_records(data_dt)
logging.info(f'{function_name} end')
def dwd_task_day():
'''
按天定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
dwd_app_install(data_dt)
dwd_user_login_pc_record(data_dt)
dwd_user_first_login_client_time(data_dt)
logging.info(f'{function_name} end')
def dws_task_day():
'''
按天定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
logging.info(f'{function_name} end')
def ads_task_day():
'''
按天定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
ads_user_level_active(data_dt)
ads_platform_active(data_dt)
ads_app_install(data_dt)
logging.info(f'{function_name} end')
if __name__ == '__main__':
scheduler = BlockingScheduler()
# 按分定时调度
scheduler.add_job(dwd_task_minute, "interval", minutes=10)
scheduler.add_job(dws_task_minute, "interval", minutes=10)
scheduler.add_job(ads_task_minute, "interval", minutes=10)
# 两个小时定时调度
# scheduler.add_job(ods_task_hour, "interval", hours=2)
scheduler.add_job(dwd_task_hour, "interval", minutes=30)
scheduler.add_job(dws_task_hour, "interval", minutes=30)
scheduler.add_job(ads_task_hour, "interval", minutes=30)
# 按天定时任务
scheduler.add_job(dwd_task_day, "interval", hours=1)
scheduler.add_job(dws_task_day, "interval", hours=1)
scheduler.add_job(ads_task_day, "interval", hours=1)
scheduler.start()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment