# -*- coding: utf-8 -*- import logging import sys from edw.dws.user.dws_user_content_order import dws_user_content_order ''' 调度任务,分层ods,dwd,dws,ads 分层调度 调度又根据业务需求,分为按小时调度和按天调度 ''' from apscheduler.schedulers.blocking import BlockingScheduler from common.time_util import get_run_time from edw.ads.basic.ads_app_install import ads_app_install from edw.ads.basic.ads_platform_active import ads_platform_active from edw.ads.basic.ads_user_level_active import ads_user_level_active from edw.ads.user.ads_user_content_order_records import ads_user_content_order_records from edw.ads.user.ads_user_watch_short_video import ads_user_watch_short_video from edw.dwd.basic.dwd_app_install import dwd_app_install from edw.dwd.user.dwd_user_content_order import dwd_user_content_order from edw.dwd.user.dwd_user_first_login_client_time import dwd_user_first_login_client_time from edw.dwd.user.dwd_user_login_pc_record import dwd_user_login_pc_record from edw.dwd.user.dwd_user_studio_add_content import dwd_user_studio_add_content from edw.dwd.user.dwd_user_watch_short_video import dwd_user_watch_short_video from edw.dws.user.dws_user_first_login_client_time import dws_user_first_login_client_time from edw.dws.user.dws_user_watch_short_video import dws_user_watch_short_video from edw.ads.user.ads_user_basic_behavior import ads_user_basic_behavior from edw.ads.user.ads_user_browse_fund import ads_user_browse_fund from edw.ads.user.ads_user_learn_course import ads_user_learn_course from edw.ads.user.ads_user_watch_live import ads_user_watch_live from edw.dwd.user.dwd_user_browse_fund import dwd_user_browse_fund from edw.dwd.user.dwd_user_learn_course import dwd_user_learn_course from edw.dwd.user.dwd_user_login_area import dwd_user_login_area from edw.dwd.user.dwd_user_login_environment import dwd_user_login_environment from edw.dwd.user.dwd_user_login_phone_mode import dwd_user_login_phone_mode from edw.dwd.user.dwd_user_share_event import dwd_user_share_event from edw.dwd.user.dwd_user_visitor_clues import dwd_user_visitor_clues from edw.dwd.user.dwd_user_watch_live import dwd_user_watch_live from edw.dws.user.dws_user_browse_fund import dws_user_browse_fund from edw.dws.user.dws_user_learn_course import dws_user_learn_course from edw.dws.user.dws_user_learn_total_dur import dws_user_learn_total_dur from edw.dws.user.dws_user_login_top_area import dws_user_login_top_area from edw.dws.user.dws_user_login_environment import dws_user_login_environment from edw.dws.user.dws_user_login_phone_mode import dws_user_login_phone_mode from edw.dws.user.dws_user_watch_live import dws_user_watch_live from edw.ods.user.ods_users_info import ods_users_info from edw.dwd.user.dwd_user_community_res import dwd_user_community_res logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) file_name = sys.argv[0] # 按分钟定时 run_minute_time = get_run_time(30) # 滞后30分钟,去取时间。时间取值范围有全量,还有按天计算的(按天计算时,时间范围取值都是从0点~24点) data_minute_dt = run_minute_time[0: 10] # 延迟2.5 小时,去取数,因为定时任务是每隔2个小时运行 run_hour_time = get_run_time(60 * 2.5) data_dt = run_hour_time[0: 10] def dwd_task_minute(): ''' 按分钟定时调度任务 ''' function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') # 内容订单数据 dwd_user_content_order(data_dt) logging.info(f'{function_name} end') def dws_task_minute(): ''' 按分钟定时调度任务 ''' function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') # 内容订单数据 dws_user_content_order(data_dt) logging.info(f'{function_name} end') def ads_task_minute(): ''' 按分钟定时调度任务 ''' function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') # 内容订单数据 ads_user_content_order_records(data_dt) logging.info(f'{function_name} end') def ods_task_hour(): ''' 按小时定时调度任务 ''' function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') # 用户信息 ods_users_info() logging.info(f'{function_name} end') def dwd_task_hour(): ''' 按小时定时调度任务 ''' function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') dwd_user_browse_fund(data_dt) dwd_user_community_res(data_dt) dwd_user_login_area(data_dt) dwd_user_learn_course(data_dt) dwd_user_login_environment(data_dt) dwd_user_login_phone_mode(data_dt) dwd_user_share_event(data_dt) dwd_user_studio_add_content(data_dt) dwd_user_visitor_clues(data_dt) dwd_user_watch_live(data_dt) dwd_user_watch_short_video(data_dt) logging.info(f'{function_name} end') def dws_task_hour(): ''' 按小时定时调度任务 ''' function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') dws_user_login_top_area(data_dt) dws_user_login_phone_mode(data_dt) dws_user_learn_total_dur(data_dt) dws_user_first_login_client_time(data_dt) dws_user_login_environment(data_dt) dws_user_learn_course(data_dt) dws_user_browse_fund(data_dt) dws_user_watch_live(data_dt) dws_user_watch_short_video(data_dt) logging.info(f'{function_name} end') def ads_task_hour(): ''' 按小时定时调度任务 ''' function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') ads_user_basic_behavior(data_dt) ads_user_browse_fund(data_dt) ads_user_learn_course(data_dt) ads_user_watch_live(data_dt) ads_user_watch_short_video(data_dt) ads_user_content_order_records(data_dt) logging.info(f'{function_name} end') def dwd_task_day(): ''' 按天定时调度任务 ''' function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') dwd_app_install(data_dt) dwd_user_login_pc_record(data_dt) dwd_user_first_login_client_time(data_dt) logging.info(f'{function_name} end') def dws_task_day(): ''' 按天定时调度任务 ''' function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') logging.info(f'{function_name} end') def ads_task_day(): ''' 按天定时调度任务 ''' function_name = sys._getframe().f_code.co_name logging.info(f'{function_name} start') ads_user_level_active(data_dt) ads_platform_active(data_dt) ads_app_install(data_dt) logging.info(f'{function_name} end') if __name__ == '__main__': scheduler = BlockingScheduler() # 按分定时调度 scheduler.add_job(dwd_task_minute, "interval", minutes=10) scheduler.add_job(dws_task_minute, "interval", minutes=10) scheduler.add_job(ads_task_minute, "interval", minutes=10) # 两个小时定时调度 # scheduler.add_job(ods_task_hour, "interval", hours=2) scheduler.add_job(dwd_task_hour, "interval", minutes=30) scheduler.add_job(dws_task_hour, "interval", minutes=30) scheduler.add_job(ads_task_hour, "interval", minutes=30) # 按天定时任务 scheduler.add_job(dwd_task_day, "interval", hours=1) scheduler.add_job(dws_task_day, "interval", hours=1) scheduler.add_job(ads_task_day, "interval", hours=1) scheduler.start()