scheduled_tasks.py 8.04 KB
Newer Older
1 2 3 4
# -*- coding: utf-8 -*-
import logging
import sys

侯双强's avatar
侯双强 committed
5
from edw.dws.user.dws_user_content_order import dws_user_content_order
6 7 8 9 10 11 12 13 14

'''
调度任务,分层ods,dwd,dws,ads 分层调度
调度又根据业务需求,分为按小时调度和按天调度
'''

from apscheduler.schedulers.blocking import BlockingScheduler

from common.time_util import get_run_time
侯双强's avatar
侯双强 committed
15 16 17 18 19 20 21 22 23 24 25 26 27 28

from edw.ads.basic.ads_app_install import ads_app_install
from edw.ads.basic.ads_platform_active import ads_platform_active
from edw.ads.basic.ads_user_level_active import ads_user_level_active
from edw.ads.user.ads_user_content_order_records import ads_user_content_order_records
from edw.ads.user.ads_user_watch_short_video import ads_user_watch_short_video
from edw.dwd.basic.dwd_app_install import dwd_app_install
from edw.dwd.user.dwd_user_content_order import dwd_user_content_order
from edw.dwd.user.dwd_user_first_login_client_time import dwd_user_first_login_client_time
from edw.dwd.user.dwd_user_login_pc_record import dwd_user_login_pc_record
from edw.dwd.user.dwd_user_studio_add_content import dwd_user_studio_add_content
from edw.dwd.user.dwd_user_watch_short_video import dwd_user_watch_short_video
from edw.dws.user.dws_user_first_login_client_time import dws_user_first_login_client_time
from edw.dws.user.dws_user_watch_short_video import dws_user_watch_short_video
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
from edw.ads.user.ads_user_basic_behavior import ads_user_basic_behavior
from edw.ads.user.ads_user_browse_fund import ads_user_browse_fund
from edw.ads.user.ads_user_learn_course import ads_user_learn_course
from edw.ads.user.ads_user_watch_live import ads_user_watch_live
from edw.dwd.user.dwd_user_browse_fund import dwd_user_browse_fund
from edw.dwd.user.dwd_user_learn_course import dwd_user_learn_course
from edw.dwd.user.dwd_user_login_area import dwd_user_login_area
from edw.dwd.user.dwd_user_login_environment import dwd_user_login_environment
from edw.dwd.user.dwd_user_login_phone_mode import dwd_user_login_phone_mode
from edw.dwd.user.dwd_user_share_event import dwd_user_share_event
from edw.dwd.user.dwd_user_visitor_clues import dwd_user_visitor_clues
from edw.dwd.user.dwd_user_watch_live import dwd_user_watch_live
from edw.dws.user.dws_user_browse_fund import dws_user_browse_fund
from edw.dws.user.dws_user_learn_course import dws_user_learn_course
from edw.dws.user.dws_user_learn_total_dur import dws_user_learn_total_dur
from edw.dws.user.dws_user_login_top_area import dws_user_login_top_area
from edw.dws.user.dws_user_login_environment import dws_user_login_environment
from edw.dws.user.dws_user_login_phone_mode import dws_user_login_phone_mode
from edw.dws.user.dws_user_watch_live import dws_user_watch_live
from edw.ods.user.ods_users_info import ods_users_info
侯双强's avatar
侯双强 committed
49
from edw.dwd.user.dwd_user_community_res import dwd_user_community_res
50 51 52 53

logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]

侯双强's avatar
侯双强 committed
54 55
# 按分钟定时
run_minute_time = get_run_time(30)  # 滞后30分钟,去取时间。时间取值范围有全量,还有按天计算的(按天计算时,时间范围取值都是从0点~24点)
侯双强's avatar
侯双强 committed
56 57
data_minute_dt = run_minute_time[0: 10]

58
# 延迟2.5 小时,去取数,因为定时任务是每隔2个小时运行
侯双强's avatar
侯双强 committed
59 60 61 62
run_hour_time = get_run_time(60 * 2.5)
data_dt = run_hour_time[0: 10]


侯双强's avatar
侯双强 committed
63
def dwd_task_minute(data_dt):
侯双强's avatar
侯双强 committed
64 65 66
    '''
    按分钟定时调度任务
    '''
侯双强's avatar
侯双强 committed
67 68 69
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    # 内容订单数据
侯双强's avatar
侯双强 committed
70
    dwd_user_content_order(data_dt)
侯双强's avatar
侯双强 committed
71 72 73
    logging.info(f'{function_name} end')


侯双强's avatar
侯双强 committed
74
def dws_task_minute(data_dt):
侯双强's avatar
侯双强 committed
75 76 77
    '''
    按分钟定时调度任务
    '''
侯双强's avatar
侯双强 committed
78 79 80
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    # 内容订单数据
侯双强's avatar
侯双强 committed
81
    dws_user_content_order(data_dt)
侯双强's avatar
侯双强 committed
82 83 84
    logging.info(f'{function_name} end')


侯双强's avatar
侯双强 committed
85
def ads_task_minute(data_dt):
侯双强's avatar
侯双强 committed
86 87 88
    '''
    按分钟定时调度任务
    '''
侯双强's avatar
侯双强 committed
89 90 91
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    # 内容订单数据
侯双强's avatar
侯双强 committed
92
    ads_user_content_order_records(data_dt)
侯双强's avatar
侯双强 committed
93
    logging.info(f'{function_name} end')
94 95


侯双强's avatar
侯双强 committed
96
def ods_task_hour(data_dt):
侯双强's avatar
侯双强 committed
97 98 99
    '''
    按小时定时调度任务
    '''
100 101 102 103 104 105 106
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    # 用户信息
    ods_users_info()
    logging.info(f'{function_name} end')


侯双强's avatar
侯双强 committed
107

侯双强's avatar
侯双强 committed
108
def dwd_task_hour(data_dt):
侯双强's avatar
侯双强 committed
109 110 111
    '''
    按小时定时调度任务
    '''
112 113 114
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    dwd_user_browse_fund(data_dt)
侯双强's avatar
侯双强 committed
115
    dwd_user_community_res(data_dt)
116
    dwd_user_login_area(data_dt)
侯双强's avatar
侯双强 committed
117
    dwd_user_learn_course(data_dt)
118 119 120
    dwd_user_login_environment(data_dt)
    dwd_user_login_phone_mode(data_dt)
    dwd_user_share_event(data_dt)
侯双强's avatar
侯双强 committed
121
    dwd_user_studio_add_content(data_dt)
122
    dwd_user_visitor_clues(data_dt)
侯双强's avatar
侯双强 committed
123 124
    dwd_user_watch_live(data_dt)
    dwd_user_watch_short_video(data_dt)
125 126 127
    logging.info(f'{function_name} end')


侯双强's avatar
侯双强 committed
128
def dws_task_hour(data_dt):
侯双强's avatar
侯双强 committed
129 130 131
    '''
    按小时定时调度任务
    '''
132 133 134
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    dws_user_login_top_area(data_dt)
侯双强's avatar
侯双强 committed
135 136 137 138 139 140
    dws_user_login_phone_mode(data_dt)
    dws_user_learn_total_dur(data_dt)
    dws_user_first_login_client_time(data_dt)
    dws_user_login_environment(data_dt)
    dws_user_learn_course(data_dt)
    dws_user_browse_fund(data_dt)
141
    dws_user_watch_live(data_dt)
侯双强's avatar
侯双强 committed
142
    dws_user_watch_short_video(data_dt)
143 144 145
    logging.info(f'{function_name} end')


侯双强's avatar
侯双强 committed
146
def ads_task_hour(data_dt):
侯双强's avatar
侯双强 committed
147 148 149
    '''
    按小时定时调度任务
    '''
150 151
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
侯双强's avatar
侯双强 committed
152
    ads_user_basic_behavior(data_dt)
153 154 155
    ads_user_browse_fund(data_dt)
    ads_user_learn_course(data_dt)
    ads_user_watch_live(data_dt)
侯双强's avatar
侯双强 committed
156 157 158 159 160 161
    ads_user_watch_short_video(data_dt)
    ads_user_content_order_records(data_dt)
    logging.info(f'{function_name} end')



侯双强's avatar
侯双强 committed
162
def dwd_task_day(data_dt):
侯双强's avatar
侯双强 committed
163 164 165 166 167 168 169 170 171 172 173 174
    '''
    按天定时调度任务
    '''
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    dwd_app_install(data_dt)
    dwd_user_login_pc_record(data_dt)
    dwd_user_first_login_client_time(data_dt)

    logging.info(f'{function_name} end')


侯双强's avatar
侯双强 committed
175
def dws_task_day(data_dt):
侯双强's avatar
侯双强 committed
176 177 178 179 180 181 182 183 184
    '''
    按天定时调度任务
    '''
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')

    logging.info(f'{function_name} end')


侯双强's avatar
侯双强 committed
185
def ads_task_day(data_dt):
侯双强's avatar
侯双强 committed
186 187 188 189 190 191 192 193
    '''
    按天定时调度任务
    '''
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    ads_user_level_active(data_dt)
    ads_platform_active(data_dt)
    ads_app_install(data_dt)
194 195 196
    logging.info(f'{function_name} end')


侯双强's avatar
侯双强 committed
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
def minute_task():
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    run_time = get_run_time(10)  # 10 分钟前的时间
    data_dt = run_time[0: 10]
    dwd_task_minute(data_dt)
    dws_task_minute(data_dt)
    ads_task_minute(data_dt)
    logging.info(f'{function_name} end')


def hour_task():
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    run_time = get_run_time(10)  # 10 分钟前的时间
    data_dt = run_time[0: 10]
    dwd_task_hour(data_dt)
    dws_task_hour(data_dt)
    ads_task_hour(data_dt)
    logging.info(f'{function_name} end')


def day_task():
    function_name = sys._getframe().f_code.co_name
    logging.info(f'{function_name} start')
    run_time = get_run_time(10)  # 10 分钟前的时间
    data_dt = run_time[0: 10]
    dwd_task_day(data_dt)
    dws_task_day(data_dt)
    ads_task_day(data_dt)
    logging.info(f'{function_name} end')



231 232
if __name__ == '__main__':
    scheduler = BlockingScheduler()
侯双强's avatar
侯双强 committed
233 234

    # 按分定时调度
侯双强's avatar
侯双强 committed
235 236 237 238 239 240 241
    scheduler.add_job(minute_task, 'cron', hour='0-23', minute='*/5')

    # 每两小时调度
    scheduler.add_job(hour_task, 'cron', hour='*/2', minute='5')

    # 按天定时任务(每天2点定时运行)
    scheduler.add_job(day_task, "cron", hour='2')
242
    scheduler.start()