Commit 8e5e23e6 authored by 侯双强's avatar 侯双强

新增

parent 228ea14b
......@@ -7,6 +7,7 @@ import pymysql
from config.config import Settings
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
def get_connection():
return pymysql.connect(
host=Settings.MYSQL['host'],
......@@ -58,7 +59,7 @@ def fetch_one_list(sql, args):
def fetch_all_list(sql, args):
conn = get_connection()
cursor = conn.cursor()
cursor.execute(sql, args)
cursor.executemany(sql, args)
data_list = cursor.fetchall()
conn.commit()
connect_close(cursor, conn)
......@@ -131,10 +132,10 @@ def connect_close(cursor, conn):
# 保存至mysql
def save_result(database, table, result_dict, file_name):
row = 0
if result_dict:
result_table = database + '.' + table
logging.info(f'save {result_table} start')
row = 0
fields = None
place_holder = None
values = list()
......@@ -145,11 +146,49 @@ def save_result(database, table, result_dict, file_name):
values.append(tuple(i.values()))
sql = f'''replace into {result_table} ( {fields} ) values ( {place_holder} )'''
rs = insert_batch(sql, values)
if rs == row or rs == 2 * row: # 因为这里用的是replace
if rs >= row:
logging.info(f'save {result_table} success {row}')
else:
logging.error(f'save {result_table} error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')
return row
def save_etl_log(database, table, data_dt, row, status, task_file, now_time):
sql = f'''replace into tamp_data_dwd.dwd_etl_log (data_dt,`database`,`table`,`rows`,`status`,task_file,run_time)
values('{data_dt}', '{database}', '{table}', {row}, '{status}', '{task_file}', '{now_time}')'''
insert(sql, None)
# def upsert(database, table, result_dict, file_name):
# '''
# 更新或插入,存在则更新,不存在,则插入
# :param database:
# :param table:
# :param result_dict:
# :param file_name:
# :return:
# '''
# if result_dict:
# result_table = database + '.' + table
# logging.info(f'upsert {result_table} start')
# row = 0
# fields = None
# place_holder = None
# values = list()
# for i in result_dict:
# row = row + 1
# fields = ','.join([f"`{k}`" for k in i.keys()])
# place_holder = ','.join(["%s" for _ in i.keys()])
# values.append(tuple(i.values()))
#
#
# sql = f'''replace into {result_table} ( {fields} ) values ( {place_holder} )'''
# rs = insert_batch(sql, values)
# if rs == row or rs == 2 * row: # 因为这里用的是replace
# logging.info(f'upsert {result_table} success {row}')
# else:
# logging.error(f'upsert {result_table} error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')
def rows_to_dict_list(cursor):
......
# -*- coding: utf-8 -*-
import logging
import sys
from sqlite3 import Row
import pymysql
from config.test_config import Settings
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
def get_connection():
return pymysql.connect(
......@@ -17,21 +19,48 @@ def get_connection():
)
def fetch_one(sql):
def fetch_one(sql, args):
conn = get_connection()
cursor = conn.cursor()
cursor.execute(sql, args)
column_names = [col[0] for col in cursor.description]
dict_res = [dict(zip(column_names, row)) for row in cursor.fetchall()]
conn.commit()
connect_close(cursor, conn)
return dict_res
def fetch_all(sql, args):
conn = get_connection()
cursor = conn.cursor()
cursor.execute(sql)
cursor.execute(sql, args)
column_names = [col[0] for col in cursor.description]
dict_res = [dict(zip(column_names, row)) for row in cursor.fetchall()]
conn.commit()
connect_close(cursor, conn)
return dict_res
def fetch_one_list(sql, args):
conn = get_connection()
cursor = conn.cursor()
cursor.execute(sql, args)
data = cursor.fetchone()
conn.commit()
connect_close(cursor, conn)
return data
def fetch_all(sql, args):
def fetch_all_list(sql, args):
conn = get_connection()
cursor = conn.cursor()
cursor.execute(sql, args)
data_list = cursor.fetchall()
conn.commit()
connect_close(cursor, conn)
return data_list
......@@ -42,9 +71,26 @@ def insert(sql, args):
cursor = conn.cursor()
row = cursor.execute(sql, args)
conn.commit()
connect_close(cursor, conn)
return row
# def insert_batch(sql, args):
# conn = get_connection()
# cursor = conn.cursor()
# # row = cursor.executemany(sql, args)
# # conn.commit()
# # connect_close(cursor, conn)
# row = 0
# try:
# row = cursor.executemany(sql, args)
# logging.info(sql, args)
# conn.commit()
# except Exception as e:
# cursor.rollback()
# logging.error(e)
# connect_close(cursor, conn)
# return row
def insert_batch(sql, args):
......@@ -53,9 +99,11 @@ def insert_batch(sql, args):
row = cursor.executemany(sql, args)
conn.commit()
connect_close(cursor, conn)
return row
def update(sql, args):
conn = get_connection()
cursor = conn.cursor()
......@@ -70,18 +118,40 @@ def update(sql, args):
def update_batch(sql, args):
conn = get_connection()
cursor = conn.cursor()
row = None
try:
row = cursor.executemany(sql, args)
conn.commit()
except Exception as e:
logging.error(e)
conn.rollback()
finally:
connect_close(cursor, conn)
row = cursor.executemany(sql, args)
conn.commit()
connect_close(cursor, conn)
return row
def connect_close(cursor, conn):
cursor.close()
conn.close()
# 保存至mysql
def save_result(database, table, result_dict, file_name):
if result_dict:
result_table = database + '.' + table
logging.info(f'save {result_table} start')
row = 0
fields = None
place_holder = None
values = list()
for i in result_dict:
row = row + 1
fields = ','.join([f"`{k}`" for k in i.keys()])
place_holder = ','.join(["%s" for _ in i.keys()])
values.append(tuple(i.values()))
sql = f'''replace into {result_table} ( {fields} ) values ( {place_holder} )'''
rs = insert_batch(sql, values)
if rs == row or rs == 2 * row: # 因为这里用的是replace
logging.info(f'save {result_table} success {row}')
else:
logging.error(f'save {result_table} error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')
def rows_to_dict_list(cursor):
columns = [i[0] for i in cursor.description]
return [dict(zip(columns, row)) for row in cursor]
\ No newline at end of file
......@@ -74,6 +74,11 @@ def get_run_time(args):
# def get_current_week():
# """
# 给定一个日期-返回日期所在周的周一0点时间 和 周日23点59分59秒
# :param date_str: 如:"2020-05-01"
# :return: 给定一个日期-返回日期所在周的周一0点时间 和 周日23点59分59秒
# """
# import datetime
# monday, sunday = datetime.date.today(), datetime.date.today()
# one_day = datetime.timedelta(days=1)
......@@ -88,43 +93,43 @@ def get_run_time(args):
# }
# return week_dict
# def get_current_week(data_dt):
# """
# 给定一个日期-返回日期所在周的周一0点时间 和 周日23点59分59秒
# :param date_str: 如:"2020-05-01"
# :return: 给定一个日期-返回日期所在周的周一0点时间 和 周日23点59分59秒
# """
# import datetime
# now_time = datetime.datetime.strptime(data_dt + " 00:00:00", "%Y-%m-%d %H:%M:%S")
# monday = now_time - datetime.timedelta(days=now_time.weekday(), hours=now_time.hour,
# minutes=now_time.minute, seconds=now_time.second,
# microseconds=now_time.microsecond)
# sunday = monday + datetime.timedelta(days=6, hours=23, minutes=59, seconds=59)
# week_dict = {
# 'monday': monday,
# 'sunday': sunday
# }
# return week_dict
def get_current_week(data_dt):
"""
给定一个日期-返回日期所在上周五的0点时间 和 本周四23点59分59秒
给定一个日期-返回日期所在周的周一0点时间 和 周日23点59分59秒
:param date_str: 如:"2020-05-01"
:return: 给定一个日期-返回日期所在上周五的0点时间 和 本周四23点59分59秒
:return: 给定一个日期-返回日期所在周的周一0点时间 和 周日23点59分59秒
"""
import datetime
now_time = datetime.datetime.strptime(data_dt + " 00:00:00", "%Y-%m-%d %H:%M:%S")
last_friday = now_time - datetime.timedelta(days=(now_time.weekday() + 3) % 7, hours=now_time.hour,
monday = now_time - datetime.timedelta(days=now_time.weekday(), hours=now_time.hour,
minutes=now_time.minute, seconds=now_time.second,
microseconds=now_time.microsecond)
thursday = last_friday + datetime.timedelta(days=6, hours=23, minutes=59, seconds=59)
sunday = monday + datetime.timedelta(days=6, hours=23, minutes=59, seconds=59)
week_dict = {
'last_friday': last_friday,
'thursday': thursday
'monday': monday,
'sunday': sunday
}
return week_dict
#
# def get_current_week(data_dt):
# """
# 给定一个日期-返回日期所在上周五的0点时间 和 本周四23点59分59秒
# :param date_str: 如:"2020-05-01"
# :return: 给定一个日期-返回日期所在上周五的0点时间 和 本周四23点59分59秒
# """
# import datetime
# now_time = datetime.datetime.strptime(data_dt + " 00:00:00", "%Y-%m-%d %H:%M:%S")
# last_friday = now_time - datetime.timedelta(days=(now_time.weekday() + 3) % 7, hours=now_time.hour,
# minutes=now_time.minute, seconds=now_time.second,
# microseconds=now_time.microsecond)
# thursday = last_friday + datetime.timedelta(days=6, hours=23, minutes=59, seconds=59)
# week_dict = {
# 'last_friday': last_friday,
# 'thursday': thursday
# }
# return week_dict
# 获取周一至周日的日期
......
# -*- coding: utf-8 -*-
'''
app安装量,定时任务,每天运行一次
'''
import json
import logging
import os
import sys
import time
import requests
from common.mysql_uitl import save_result, fetch_all, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def ads_app_install(data_dt):
install_num_dict = query_app_install(data_dt)
row = save_result('tamp_data_ads', 'ads_app_install', install_num_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_app_install', data_dt, row, 'done', task_file, now_time)
def query_app_install(data_dt):
data_dt = (datetime.datetime.strptime(str(data_dt), "%Y-%m-%d") - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
select p.data_dt
,case when weekday(p.data_dt) + 1 = '1' then '星期一'
when weekday(p.data_dt) + 1 = '2' then '星期二'
when weekday(p.data_dt) + 1 = '3' then '星期三'
when weekday(p.data_dt) + 1 = '4' then '星期四'
when weekday(p.data_dt) + 1 = '5' then '星期五'
when weekday(p.data_dt) + 1 = '6' then '星期六'
when weekday(p.data_dt) + 1 = '7' then '星期日'
else '计算错误'
end as weekday
,sum(p.pc_num) as pc_num
,sum(p.ios_num) as ios_num
,sum(p.android_num) as android_num
,sum(p.total_app_num) as total_app_num
from
(
select data_dt
,count(1) as pc_num
,0 as ios_num
,0 as android_num
,0 as total_app_num
from tamp_data_dwd.dwd_user_login_pc_record
where data_dt = %s
and team_id <> '0'
group by data_dt
union all
select data_dt
,0 as pc_num
,ios_num
,android_num
,total_app_num
from tamp_data_dwd.dwd_app_install
where data_dt = %s
) p
group by p.data_dt, weekday
'''
install_num_dict = fetch_all(sql, (data_dt, data_dt))
logging.info(f'{function_name} success')
return install_num_dict
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 9, 19)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
ads_app_install(data_dt)
data_dt += delta
\ No newline at end of file
# -*- coding: utf-8 -*-
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log
from common.time_util import get_current_week, month_start_and_end, now_str, YMDHMS_FORMAT
'''
按平台统计用户日活/周活/月活
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def ads_platform_active(data_dt):
# 日活
platform_dau_dict = query_platform_dau(data_dt)
row = save_result('tamp_data_ads', 'ads_platform_dau', platform_dau_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_platform_dau', data_dt, row, 'done', task_file, now_time)
# 周活
platform_wau_dict = query_platform_wau(data_dt)
row = save_result('tamp_data_ads', 'ads_platform_wau', platform_wau_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_platform_wau', data_dt, row, 'done', task_file, now_time)
# 月活
platform_mau_dict = query_platform_mau(data_dt)
row = save_result('tamp_data_ads', 'ads_platform_mau', platform_mau_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_platform_mau', data_dt, row, 'done', task_file, now_time)
def query_platform_dau(data_dt):
'''
按平台统计用户日活
:param data_dt:
:return:
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
select data_dt
,case when weekday(data_dt) + 1 = '1' then '星期一'
when weekday(data_dt) + 1 = '2' then '星期二'
when weekday(data_dt) + 1 = '3' then '星期三'
when weekday(data_dt) + 1 = '4' then '星期四'
when weekday(data_dt) + 1 = '5' then '星期五'
when weekday(data_dt) + 1 = '6' then '星期六'
when weekday(data_dt) + 1 = '7' then '星期日'
else '计算错误'
end as weekday
,count(distinct case when env = 'android' then user_id end) as android_dau
,count(distinct case when env = 'ios' then user_id end) as ios_dau
,count(distinct case when env in ('android', 'ios') then user_id end) as app_dau
,count(distinct case when env = 'wechat' then user_id end) as wechat_dau
,count(distinct case when env = 'xcx' then user_id end) as xcx_dau
,count(distinct case when env = 'PCManager' then user_id end) as pc_dau
,count(distinct user_id) as all_dau
from tamp_data_dwd.dwd_user_login_environment
where data_dt = %s
and team_id <> '0'
group by data_dt, weekday
'''
platform_dau_dict = fetch_all(sql, data_dt)
logging.info(f'{function_name} success')
return platform_dau_dict
def query_platform_wau(data_dt):
'''
按平台统计用户周活
:param data_dt:
:return:
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
current_week = get_current_week(data_dt)
start_date = str(current_week['monday'])[0: 10]
end_date = str(current_week['sunday'])[0: 10]
sql = '''
select concat_ws('~', %s, %s) as data_dt
,count(distinct case when env = 'android' then user_id end) as android_wau
,count(distinct case when env = 'ios' then user_id end) as ios_wau
,count(distinct case when env in ('android', 'ios') then user_id end) as app_wau
,count(distinct case when env = 'wechat' then user_id end) as wechat_wau
,count(distinct case when env = 'xcx' then user_id end) as xcx_wau
,count(distinct case when env = 'PCManager' then user_id end) as pc_wau
,count(distinct user_id) as all_wau
from tamp_data_dwd.dwd_user_login_environment
where data_dt between %s and %s
and team_id <> '0'
group by concat_ws('~', %s, %s)
'''
platform_wau_dict = fetch_all(sql, (start_date, end_date, start_date, end_date, start_date, end_date))
logging.info(f'{function_name} success')
return platform_wau_dict
def query_platform_mau(data_dt):
'''
按平台统计用户月活
:param data_dt:
:return:
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
month_dict = month_start_and_end(data_dt)
start_date = month_dict.get('start_date')
end_date = month_dict.get('end_date')
sql = '''
select date_format(data_dt, '%%Y-%%m') as data_dt
,count(distinct case when env = 'android' then user_id end) as android_mau
,count(distinct case when env = 'ios' then user_id end) as ios_mau
,count(distinct case when env in ('android', 'ios') then user_id end) as app_mau
,count(distinct case when env = 'wechat' then user_id end) as wechat_mau
,count(distinct case when env = 'xcx' then user_id end) as xcx_mau
,count(distinct case when env = 'PCManager' then user_id end) as pc_mau
,count(distinct user_id) as all_mau
from tamp_data_dwd.dwd_user_login_environment
where data_dt between %s and %s
group by date_format(data_dt, '%%Y-%%m')
'''
platform_mau_dict = fetch_all(sql, (start_date, end_date))
logging.info(f'{function_name} success')
return platform_mau_dict
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 1, 1)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
ads_platform_active(str(data_dt))
data_dt += delta
This diff is collapsed.
......@@ -4,24 +4,28 @@
'''
用户登录行为汇总,定时任务,每2个小时运行一次
'''
import os
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import save_result, save_etl_log
import logging
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import fetch_all, insert_batch
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def ads_user_basic_behavior():
def ads_user_basic_behavior(data_dt):
user_basic_behavior_dict = query_ads_user_basic_behavior()
save_ads_user_basic_behavior(user_basic_behavior_dict)
# save_ads_user_basic_behavior(user_basic_behavior_dict)
row = save_result('tamp_data_ads', 'ads_user_basic_behavior', user_basic_behavior_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_user_level_dau', data_dt, row, 'done', task_file, now_time)
# 这里有点细节,没有严格去做。
......
# -*- coding: utf-8 -*-
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
'''
用户浏览产品明细数据统计和汇总数据统计,定时任务,每2个小时运行一次
......@@ -14,13 +16,20 @@ p2060 探普产品详情
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def ads_user_browse_fund(data_dt):
browse_fund_details_dict = query_user_browse_fund_details(data_dt)
row = save_result('tamp_data_ads', 'ads_user_browse_fund_details', browse_fund_details_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_user_browse_fund_details', data_dt, row, 'done', task_file, now_time)
browse_fund_summary_dict = query_user_browse_fund_summary()
save_result('tamp_data_ads', 'ads_user_browse_fund_details', browse_fund_details_dict, file_name)
save_result('tamp_data_ads', 'ads_user_browse_fund_summary', browse_fund_summary_dict, file_name)
row = save_result('tamp_data_ads', 'ads_user_browse_fund_summary', browse_fund_summary_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_user_browse_fund_summary', data_dt, row, 'done', task_file, now_time)
def query_user_browse_fund_details(data_dt):
......
......@@ -6,16 +6,22 @@
注意:在ads层需要隐藏手机号
'''
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result
from common.mysql_uitl import fetch_all, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
file_name = sys.argv
file_name1 = os.path.basename(__file__)
task_file = os.path.split(__file__)[-1].split(".")[0]
def ads_user_content_order_records():
def ads_user_content_order_records(data_dt):
order_records_dict = query_user_content_order()
save_result('tamp_data_ads', 'ads_user_content_order_records', order_records_dict, file_name)
row = save_result('tamp_data_ads', 'ads_user_content_order_records', order_records_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_user_content_order_records', data_dt, row, 'done', task_file, now_time)
def query_user_content_order():
......@@ -61,4 +67,4 @@ def query_user_content_order():
if __name__ == '__main__':
ads_user_content_order_records()
\ No newline at end of file
ads_user_content_order_records()
# -*- coding: utf-8 -*-
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
'''
用户观看直播明细数据统计,定时任务,每2个小时运行一次
......@@ -16,6 +18,7 @@ p10507 试听课程页面(课程包)
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def ads_user_learn_course(data_dt):
......@@ -27,15 +30,19 @@ def ads_user_learn_course(data_dt):
learn_course_details_dict = query_user_learn_course_details(data_dt)
merge_learn_course(learn_course_details_dict, invite_people_record_result_dict)
# 保存明細数据
save_result('tamp_data_ads', 'ads_user_learn_course_details', learn_course_details_dict, file_name)
row = save_result('tamp_data_ads', 'ads_user_learn_course_details', learn_course_details_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_user_learn_course_details', data_dt, row, 'done', task_file, now_time)
single_course_invite_summary_dict = query_single_course_invite_people_summary()
course_invite_summary_dict = query_course_invite_people_summary()
invite_people_summary_result_dict = merge_course_invite_people(single_course_invite_summary_dict, course_invite_summary_dict)
learn_course_summary_dict = query_user_learn_course_summary()
merge_learn_course(learn_course_summary_dict, invite_people_summary_result_dict)
# # 保存汇总数据
# 保存汇总数据
save_result('tamp_data_ads', 'ads_user_learn_course_summary', learn_course_summary_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_user_learn_course_summary', data_dt, row, 'done', task_file, now_time)
# 单节课和课程包邀请人数,没有严格去重邀请人数,只是对单节课程,邀请人数做了去重,课程包邀请人数做了去重
......@@ -238,7 +245,7 @@ def query_user_learn_course_summary():
,q.team_id
,p.course_id
,p.course_name
,coalesce(q.course_type, '免费') as course_type
,coalesce(t.course_type, '免费') as course_type
,p.learn_dur
,p.total_dur
,if(p.play_rate >=100.00, 100.00, p.play_rate) as play_rate
......@@ -249,13 +256,13 @@ def query_user_learn_course_summary():
,user_id
,course_id
,course_name
,total_dur
,max(total_dur) as total_dur # 后面可能有新课程增加
,sum(learn_dur) as learn_dur
,round(sum(learn_dur) / total_dur * 100, 2) as play_rate
,online_time
,sum(share_num) as share_num
from tamp_data_dws.dws_user_learn_course
group by user_id,course_id,course_name,online_time,total_dur
group by user_id,course_id,course_name,online_time
) p
left join
(
......@@ -283,9 +290,9 @@ def query_user_learn_course_summary():
on p.user_id = q.user_id
order by p.user_id, p.data_dt desc, p.course_id
'''
watch_live_summary_dict = fetch_all(sql, None)
learn_course_summary_dict = fetch_all(sql, None)
logging.info(f'{function_name} success')
return watch_live_summary_dict
return learn_course_summary_dict
def merge_course_invite_people(dict1_list, dict2_list):
......@@ -353,8 +360,8 @@ def merge_learn_course(learn_course_dict, invite_dict):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 14)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 16)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
# -*- coding: utf-8 -*-
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
'''
用户观看直播明细数据统计,定时任务,每2个小时运行一次
......@@ -11,13 +13,19 @@ from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def ads_user_watch_live(data_dt):
watch_live_details_dict = query_user_watch_live_details(data_dt)
row = save_result('tamp_data_ads', 'ads_user_watch_live_details', watch_live_details_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_user_watch_live_details', data_dt, row, 'done', task_file, now_time)
watch_live_summary_dict = query_user_watch_live_summary()
save_result('tamp_data_ads', 'ads_user_watch_live_details', watch_live_details_dict, file_name)
save_result('tamp_data_ads', 'ads_user_watch_live_summary', watch_live_summary_dict, file_name)
row = save_result('tamp_data_ads', 'ads_user_watch_live_summary', watch_live_summary_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_user_watch_live_summary', data_dt, row, 'done', task_file, now_time)
def query_user_watch_live_details(data_dt):
......
# -*- coding: utf-8 -*-
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
'''
用户观看短视频明细数据统计,定时任务,每2个小时运行一次
......@@ -11,13 +13,19 @@ from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def ads_user_watch_short_video(data_dt):
watch_short_video_details_dict = query_user_watch_short_video_details(data_dt)
row = save_result('tamp_data_ads', 'ads_user_watch_short_video_details', watch_short_video_details_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_user_watch_short_video_details', data_dt, row, 'done', task_file, now_time)
watch_short_video_summary_dict = query_user_watch_short_video_summary()
save_result('tamp_data_ads', 'ads_user_watch_short_video_details', watch_short_video_details_dict, file_name)
save_result('tamp_data_ads', 'ads_user_watch_short_video_summary', watch_short_video_summary_dict, file_name)
row = save_result('tamp_data_ads', 'ads_user_watch_short_video_summary', watch_short_video_summary_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_ads', 'ads_user_watch_short_video_summary', data_dt, row, 'done', task_file, now_time)
def query_user_watch_short_video_details(data_dt):
......
# -*- coding: utf-8 -*-
'''
access_log 历史数据从测试环境同步至生产环境。(手工同步一次,不做定时任务)
'''
import logging
import sys
from common.mysql_uitl import save_result
from common.test_mysql_uitl import fetch_all
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
def dwd_access_history_log(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
access_log_dict = query_access_history_log(start_time, end_time)
save_result('tamp_analysis', 'access_history_log', access_log_dict, file_name)
def query_access_history_log(start_time, end_time):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''select * from tamp_data_analysis.access_log where server_time between %s and %s'''
access_log_dict = fetch_all(sql, (start_time, end_time))
logging.info(f'{function_name} success')
return access_log_dict
if __name__ == '__main__':
# dwd_app_install(data_dt)
import datetime
begin = datetime.date(2021, 1, 1)
end = datetime.date(2021, 4, 4)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dwd_access_history_log(data_dt)
data_dt += delta
......@@ -2,19 +2,22 @@
'''
,定时任务,每天运行一次
app安装量,定时任务,每天运行一次
'''
import json
import logging
import os
import sys
import time
import requests
from common.mysql_uitl import save_result
from common.mysql_uitl import save_result, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_app_install(data_dt):
......@@ -32,10 +35,14 @@ def dwd_app_install(data_dt):
android_dict = get_app_install(android_url, 'android')
ios_dict = get_app_install(ios_url, 'ios')
ret_dict = dict(android_dict, **ios_dict)
ret_list = list()
ret_list.append(ret_dict)
if ret_list:
save_result('tamp_data_dwd', 'dwd_app_install', ret_list, file_name)
total_app_num = ret_dict['android_num'] + ret_dict['ios_num']
ret_dict['total_app_num'] = total_app_num
if ret_dict:
ret_list = list()
ret_list.append(ret_dict)
row = save_result('tamp_data_dwd', 'dwd_app_install', ret_list, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_app_install', data_dt, row, 'done', task_file, now_time)
def get_app_install(url, app_type):
......@@ -44,9 +51,7 @@ def get_app_install(url, app_type):
response_ret = requests.get(url)
content = response_ret.content.decode()
content_dict = json.loads(content)
print(url)
body = content_dict['body']
print(body)
install_dict = {}
for i in body:
data_dt = i['date'][0: 10]
......@@ -61,8 +66,8 @@ def get_app_install(url, app_type):
if __name__ == '__main__':
# dwd_app_install(data_dt)
import datetime
begin = datetime.date(2020, 9, 15)
end = datetime.date(2021, 9, 15)
begin = datetime.date(2021, 9, 18)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -5,20 +5,24 @@
用户浏览基金详情,定时任务,每2个小时运行一次
'''
import logging
import os
import sys
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_browse_fund(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
browse_fund_dict = query_user_browse_fund(start_time, end_time)
save_dwd_user_browse_fund(browse_fund_dict)
# save_dwd_user_browse_fund(browse_fund_dict)
row = save_result('tamp_data_dwd', 'dwd_user_browse_fund', browse_fund_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_browse_fund', data_dt, row, 'done', task_file, now_time)
def query_user_browse_fund(start_time, end_time):
......
This diff is collapsed.
......@@ -7,17 +7,22 @@
import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch, save_result
from common.mysql_uitl import fetch_all, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
file_name = sys.argv[0]
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_content_order():
def dwd_user_content_order(data_dt):
content_order_dict = query_dwd_user_content_order()
save_result('tamp_data_dwd', 'dwd_user_content_order', content_order_dict, file_name)
row = save_result('tamp_data_dwd', 'dwd_user_content_order', content_order_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_content_order', data_dt, row, 'done', task_file, now_time)
def query_dwd_user_content_order():
......
# -*- coding: utf-8 -*-
'''
用户首次登录客户端时间,定时任务,每天运行一次
'''
import logging
import sys
from common.mysql_uitl import fetch_all, insert_batch, save_result, update_batch
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.ERROR)
file_name = sys.argv[0]
def dwd_user_first_login_client_time(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
login_time_dict = query_first_login_time(start_time, end_time)
save_user_login_time(login_time_dict)
# update_user_name()
def query_first_login_time(start_time, end_time):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
select `uid` as user_id
,min(if(env = 'ios',server_time,null)) as login_ios_time
,min(if(env = 'android',server_time, null)) as login_android_time
,min(if(env = 'PCManager',server_time, null)) as login_pc_time
,min(if(env = 'wechat',server_time, null)) as login_wechat_time
,min(if(env = 'xcx',server_time, null)) as login_xcx_time
# from tamp_analysis.access_log
from tamp_analysis.access_history_log
where event_type = '1002'
and server_time between %s and %s
and env <> ''
and env is not null
and `uid` <> ''
and `uid` is not null
group by `uid`
'''
exist_users_dict = fetch_all(sql, (start_time, end_time))
logging.info(f'{function_name} success')
return exist_users_dict
def save_user_login_time(login_time_dict):
if login_time_dict:
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
database = 'tamp_data_dwd'
table = 'dwd_user_first_login_client_time'
result_table = database + '.' + table
field_list = ['login_wechat_time', 'login_ios_time', 'login_android_time', 'login_pc_time', 'login_xcx_time']
for field in field_list:
ios_dict_list = list()
android_dict_list = list()
pc_dict_list = list()
wechat_dict_list = list()
xcx_dict_list = list()
for i in login_time_dict:
keys = list(i.keys())
for key in keys:
if key == 'login_wechat_time':
wechat_dict_list.append(dict(user_id=i['user_id'], login_wechat_time=i[field]))
elif key == 'login_ios_time':
ios_dict_list.append(dict(user_id=i['user_id'], login_ios_time=i[field]))
elif key == 'login_android_time':
android_dict_list.append(dict(user_id=i['user_id'], login_android_time=i[field]))
elif key == 'login_pc_time':
pc_dict_list.append(dict(user_id=i['user_id'], login_pc_time=i[field]))
elif key == 'login_xcx_time':
xcx_dict_list.append(dict(user_id=i['user_id'], login_xcx_time=i[field]))
else:
pass
''' 应该分三种情况,1.存在这个用户,却没有登录过这个客户端,2.存在这个用户,登录过这个客户端,3.没有存在这个用户'''
# 结果表中,存在这个用户,但之前沒有登录这个客户端的,则需要给这个客户端增加登录时间
sql1 = f'''select {field}, user_id from {result_table}
where user_id in ({','.join(["'%s'" % item['user_id'] for item in login_time_dict])})
and {field} is null '''
exist_users_not_login_dict = fetch_all(sql1, None)
exist_users_not_login_list = list()
if field == 'login_wechat_time':
exist_users_not_login_wechat_dict = [x for x in wechat_dict_list if x in exist_users_not_login_dict]
for i in exist_users_not_login_wechat_dict:
if i['login_wechat_time']:
exist_users_not_login_list.append([i[field], i['user_id']])
elif field == 'login_ios_time':
exist_users_not_login_ios_dict = [x for x in ios_dict_list if x in exist_users_not_login_dict]
for i in exist_users_not_login_ios_dict:
if i['login_ios_time']:
exist_users_not_login_list.append([i[field], i['user_id']])
elif field == 'login_android_time':
exist_users_not_login_android_dict = [x for x in android_dict_list if x in exist_users_not_login_dict]
for i in exist_users_not_login_android_dict:
if i['login_android_time']:
exist_users_not_login_list.append([i[field], i['user_id']])
elif field == 'login_pc_time':
exist_users_not_login_pc_dict = [x for x in pc_dict_list if x in exist_users_not_login_dict]
for i in exist_users_not_login_pc_dict:
if i['login_pc_time']:
exist_users_not_login_list.append([i[field], i['user_id']])
elif field == 'login_xcx_time':
exist_users_not_login_xcx_dict = [x for x in xcx_dict_list if x in exist_users_not_login_dict]
for i in exist_users_not_login_xcx_dict:
if i['login_xcx_time']:
exist_users_not_login_list.append([i[field], i['user_id']])
update = f'''update {result_table} set {field} = (%s) where user_id = (%s)'''
update_batch(update, exist_users_not_login_list)
# 结果表中,存在这个用户
sql2 = f'''select {field}, user_id from {result_table}
where user_id in ({','.join(["'%s'" % item['user_id'] for item in login_time_dict])})'''
exist_users_dict = fetch_all(sql2, None)
not_exist_users_dict = list()
if field == 'login_wechat_time':
wechat_tmp = list()
for i in wechat_dict_list:
if i['login_wechat_time']:
wechat_tmp.append(i)
not_exist_users_dict = [x for x in wechat_tmp if x not in exist_users_dict]
elif field == 'login_ios_time':
ios_tmp = list()
for i in ios_dict_list:
print(i)
if i['login_ios_time']:
print(i['login_ios_time'])
ios_tmp.append(i)
not_exist_users_dict = [x for x in ios_tmp if x not in exist_users_dict]
elif field == 'login_android_time':
android_tmp = list()
for i in android_dict_list:
if i['login_android_time']:
android_tmp.append(i)
not_exist_users_dict = [x for x in android_tmp if x not in exist_users_dict]
elif field == 'login_pc_time':
pc_tmp = list()
for i in pc_dict_list:
if i['login_pc_time']:
pc_tmp.append(i)
not_exist_users_dict = [x for x in pc_tmp if x not in exist_users_dict]
elif field == 'login_xcx_time':
xcx_tmp = list()
for i in xcx_dict_list:
if i['login_xcx_time']:
xcx_tmp.append(i)
not_exist_users_dict = [x for x in xcx_tmp if x not in exist_users_dict]
save_result(database, table, not_exist_users_dict, file_name)
logging.info(f'{function_name} success')
# # print(result_list)
# # print(client_dict_list)
# # print(exist_users_not_login_dict)
#
#
# update = f'''update {result_table} set {field} = (%s) where user_id = (%s)'''
#
# # 在登录端口,已经有过登录行记录的,则不更新时间
# sql1 = f'''select user_id, {field} from {result_table}
# where user_id in ({','.join(["'%s'" % item['user_id'] for item in login_time_dict])})'''
# exist_users_dict = fetch_all(sql1, None)
#
# # print(update)
# # 已经存在的用户,则更新另外一个客户端登录的时间
# (update, result_list)
# # 不存在的用户,则插入新数据
# not_exist_users_dict = [x for x in client_dict_list if x not in exist_users_dict]
# save_result(database, table, not_exist_users_dict, file_name)
# logging.info(f'{function_name} success')
def update_user_name():
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
update tamp_data_dwd.dwd_user_first_login_client_time p
left join tamp_analysis.user_info_view t
on p.user_id = t.user_id
set p.real_name = t.real_name,
p.user_name = t.user_name,
p.nickname = t.nickname,
p.team_id = t.team_id
'''
fetch_all(sql, None)
logging.info(f'{function_name} success')
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 1, 1)
end = datetime.date(2021, 3, 31)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dwd_user_first_login_client_time(data_dt)
data_dt += delta
# -*- coding: utf-8 -*-
'''
用户首次登录客户端时间,定时任务,每天运行一次
'''
import logging
import sys
from common.mysql_uitl import fetch_all, insert_batch, save_result
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.ERROR)
file_name = sys.argv[0]
def dwd_user_first_login_client_time(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
database = 'tamp_data_dwd'
table = 'dwd_user_first_login_client_time'
# filed_list = ['wechat', 'ios', 'android', 'pc', 'xcx']
filed_list = ['wechat', 'ios', 'android']
for i in filed_list:
filed = 'login_' + i + '_time'
if i == 'pc':
exist_login_dict = query_exist_users(filed)
new_login_users_dict = query_new_users(start_time, end_time, filed, 'PCManager', exist_login_dict)
upsert(database, table, new_login_users_dict, filed)
else:
exist_login_dict = query_exist_users(filed)
new_login_users_dict = query_new_users(start_time, end_time, filed, i, exist_login_dict)
upsert(database, table, new_login_users_dict, filed)
def query_exist_users(field):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = f'select user_id from tamp_data_dwd.dwd_user_first_login_client_time where {field} is not null'
exist_users_dict = fetch_all(sql, None)
logging.info(f'{function_name} success')
return exist_users_dict
def query_new_users(start_time, end_time, field, param, exist_users_dict):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
print(field)
sql = f'''
select `uid` as user_id
,min(server_time) as {field}
from tamp_analysis.access_history_log
where server_time between %s and %s
and env = %s
and event_type = '1002'
and `uid` <> ''
and `uid` not in ({','.join(["'%s'" % item['user_id'] for item in exist_users_dict])})
group by `uid`
'''
print(sql, (start_time, end_time, param))
new_users_dict = fetch_all(sql, (start_time, end_time, param))
logging.info(f'{function_name} success')
return new_users_dict
def upsert(database, table, result_dict, field):
'''
更新或插入,存在则更新,不存在,则插入
:param database:
:param table:
:param result_dict:
:param field:
:return:
'''
if result_dict:
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
result_table = database + '.' + table
logging.info(f'upsert {result_table} start')
result_list = list()
for i in result_dict:
result_list.append([i[field], i['user_id']])
sql = f'''select user_id from {result_table}
where user_id in ({','.join(["'%s'" % item['user_id'] for item in result_dict])})
and {field} is not null '''
exist_users_dict = fetch_all(sql, None)
update = f'''update {result_table} set {field} = (%s) where user_id = (%s)'''
print(update)
# 已经存在的用户,则更新另外一个客户端登录的时间
insert_batch(update, result_list)
# 不存在的用户,则插入新数据
not_exist_users_dict = [x for x in result_dict if x not in exist_users_dict]
print(not_exist_users_dict)
save_result(database, table, not_exist_users_dict, file_name)
logging.info(f'{function_name} success')
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 1, 1)
end = datetime.date(2021, 1, 17)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dwd_user_first_login_client_time(data_dt)
data_dt += delta
......@@ -5,23 +5,26 @@
用户学习课程明细统计,定时任务,每2个小时运行一次
'''
import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import fetch_all, insert_batch, save_etl_log, save_result
from common.time_util import YMDHMS_FORMAT, now_str
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_learn_course(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
learn_course_dict = query_dwd_user_learn_course(start_time, end_time)
save_dwd_user_learn_course(learn_course_dict)
# save_ads_user_learn_course(learn_course_dict) # 明细数据双写
row = save_result('tamp_data_dwd', 'dwd_user_learn_course', learn_course_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_learn_course', data_dt, row, 'done', task_file, now_time)
def query_dwd_user_learn_course(start_time, end_time):
......
......@@ -5,23 +5,27 @@
用户登录地区,定时任务,每2个小时运行一次
'''
import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
# file_name = get_file_name()
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_login_area(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
login_area_dict = query_user_login_area(start_time, end_time)
save_dwd_user_login_area(login_area_dict)
row = save_result('tamp_data_dwd', 'save_dwd_user_login_area', login_area_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'save_dwd_user_login_area', data_dt, row, 'done', task_file, now_time)
def query_user_login_area(start_time, end_time):
......@@ -90,8 +94,8 @@ if __name__ == '__main__':
# dwd_user_login_area()
# dwd_user_login_environment(data_dt)
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 17)
end = datetime.date(2021, 9, 17)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
......@@ -5,22 +5,27 @@
用户登录环境,定时任务,每2个小时运行一次
'''
import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
# file_name = get_file_name()
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_login_environment(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
login_env_dict = query_user_login_env(start_time, end_time)
save_dwd_user_login_env(login_env_dict)
row = save_result('tamp_data_dwd', 'dwd_user_login_environment', login_env_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_login_environment', data_dt, row, 'done', task_file, now_time)
# 查询用户登录环境
......@@ -92,8 +97,8 @@ def save_dwd_user_login_env(ret):
if __name__ == '__main__':
# dwd_user_login_environment(data_dt)
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 16)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
# -*- coding: utf-8 -*-
'''
用户首次登录pc的记录,定时任务,每天运行一次
后面废弃
'''
import logging
import os
import sys
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_login_pc_record(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
exist_users_dict = query_exist_users()
new_users_dict = query_new_pc_users(start_time, end_time, exist_users_dict)
row = save_result('tamp_data_dwd', 'dwd_user_login_pc_record', new_users_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_login_pc_record', data_dt, row, 'done', task_file, now_time)
# 查询已经存在的pc用户
def query_exist_users():
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = 'select user_id from tamp_data_dwd.dwd_user_login_pc_record'
exist_users_dict = fetch_all(sql, None)
logging.info(f'{function_name} success')
return exist_users_dict
def query_new_pc_users(start_time, end_time, exist_users_dict):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = f'''
select p.data_dt
,p.user_id
,coalesce(t.team_id, 0) as team_id
,p.first_time
from
(
select `uid` as user_id
,min(date_format(server_time,'%%Y-%%m-%%d')) as data_dt
,min(server_time) as first_time
from tamp_analysis.access_history_log
where server_time between %s and %s
and env = 'PCManager'
and event_type = '1002'
and `uid` <> ''
and `uid` not in ({','.join(["'%s'" % item['user_id'] for item in exist_users_dict])})
group by `uid`
) p
left join tamp_analysis.user_info_view t
on p.user_id = t.user_id
'''
new_users_dict = fetch_all(sql, (start_time, end_time))
logging.info(f'{function_name} success')
return new_users_dict
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 9, 18)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dwd_user_login_pc_record(data_dt)
data_dt += delta
......@@ -5,15 +5,18 @@
用户登录手机型号,定时任务,每2个小时运行一次
'''
import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
# file_name = get_file_name()
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_login_phone_mode(data_dt):
......@@ -21,6 +24,9 @@ def dwd_user_login_phone_mode(data_dt):
end_time = str(data_dt) + ' 23:59:59'
login_phone_dict = query_user_login_phone_mode(start_time, end_time)
save_dwd_user_login_phone_mode(login_phone_dict)
row = save_result('tamp_data_dwd', 'dwd_user_login_phone_mode', login_phone_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_login_phone_mode', data_dt, row, 'done', task_file, now_time)
def query_user_login_phone_mode(start_time, end_time):
......
......@@ -28,12 +28,14 @@
4043 分享产品首页 产品
'''
import logging
import os
import sys
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_share_event(data_dt):
......@@ -41,7 +43,10 @@ def dwd_user_share_event(data_dt):
end_time = str(data_dt) + ' 23:59:59'
share_dict = query_share_event()
user_share_event_dict = query_dwd_user_share_event(share_dict, start_time, end_time)
save_dwd_user_share_event(user_share_event_dict)
# save_dwd_user_share_event(user_share_event_dict)
row = save_result('tamp_data_dwd', 'dwd_user_share_event', user_share_event_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_share_event', data_dt, row, 'done', task_file, now_time)
def query_share_event():
......
This diff is collapsed.
......@@ -6,9 +6,11 @@
'''
import logging
import os
import sys
from common.mysql_uitl import fetch_all, insert_batch, save_result
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
'''
p1005 直播详情页
......@@ -40,13 +42,17 @@ p13502 个人主页(访问发圈子用户的主页)
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_visitor_clues(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
visitor_record_dict = query_dwd_user_visitor_record(start_time, end_time)
save_dwd_user_visit_clues(visitor_record_dict)
# save_dwd_user_visit_clues(visitor_record_dict)
row = save_result('tamp_data_dwd', 'dwd_user_visit_clues', visitor_record_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_visit_clues', data_dt, row, 'done', task_file, now_time)
def query_dwd_user_visitor_record(start_time, end_time):
......
......@@ -8,19 +8,23 @@ import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_watch_live(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
user_watch_dict = query_dwd_user_watch_live(start_time, end_time)
save_dwd_user_watch_live(user_watch_dict)
# save_dwd_user_watch_live(user_watch_dict)
row = save_result('tamp_data_dwd', 'dwd_user_watch', user_watch_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_watch', data_dt, row, 'done', task_file, now_time)
def query_dwd_user_watch_live(start_time, end_time):
......
......@@ -7,19 +7,21 @@
import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch, save_result
from common.mysql_uitl import fetch_all, save_result, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
# file_path = get_file_path()
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dwd_user_watch_short_video(data_dt):
start_time = str(data_dt) + ' 00:00:00'
end_time = str(data_dt) + ' 23:59:59'
watch_short_video_dict = query_dwd_user_watch_short_video(start_time, end_time)
save_result('tamp_data_dwd', 'dwd_user_watch_short_video', watch_short_video_dict, file_name)
row = save_result('tamp_data_dwd', 'dwd_user_watch_short_video', watch_short_video_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dwd', 'dwd_user_watch_short_video', data_dt, row, 'done', task_file, now_time)
def query_dwd_user_watch_short_video(start_time, end_time):
......@@ -74,8 +76,8 @@ def query_dwd_user_watch_short_video(start_time, end_time):
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 15)
end = datetime.date(2021, 9, 21)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
# -*- coding: utf-8 -*-
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
'''
用户观看直播明细数据统计,定时任务,每2个小时运行一次
......@@ -10,11 +12,14 @@ from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dws_user_browse_fund(data_dt):
browse_fund_dict = query_dws_user_browse_fund(data_dt)
save_result('tamp_data_dws', 'dws_user_browse_fund', browse_fund_dict, file_name)
row = save_result('tamp_data_dws', 'dws_user_browse_fund', browse_fund_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dws', 'dws_user_browse_fund', data_dt, row, 'done', task_file, now_time)
def query_dws_user_browse_fund(data_dt):
......
# -*- coding: utf-8 -*-
import logging
import sys
from common.mysql_uitl import fetch_all
'''
用户分享事件统计,定时任务,每2个小时运行一次
现在只有产品/直播/课程的线索
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
def dws_user_clues(data_dt):
user_clues_dict = query_user_fund_clues()
save_user_clues(user_clues_dict)
def query_user_fund_clues():
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 有的数据,有访问,却没有分享
# 是不是要建视图
sql = '''
select p.data_dt as share_dt
,t.data_dt as visitor_dt
,p.user_id
,p.real_name
,p.user_name
,p.nickname
,p.team_id
,p.res_id
,p.event_type
,p.extra_id
,p.source_user_id
,p.source_uuid_id
,p.local_time as share_time
,t.local_time as visitor_time
from tamp_data_dwd.dwd_user_share_event p
left join tamp_data_dwd.dwd_user_visit_clues t
on p.source_user_id = t.source_user_id
and p.source_uuid_id = t.source_uuid_id
and p.res_id = t.res_id
'''
user_clues_dict = fetch_all(sql, None)
logging.info(f'{function_name} success')
return user_clues_dict
def save_user_clues(user_clues_dict):
sql = '''
'''
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dws_user_clues(data_dt)
data_dt += delta
......@@ -5,45 +5,53 @@
内容订单数据(全量同步,订单量多了,再用增量),定时任务,每10分钟运行一次
'''
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result
from common.mysql_uitl import fetch_all, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dws_user_content_order():
def dws_user_content_order(data_dt):
# 新课程订单
course_order_dict = query_dws_course_order()
save_result('tamp_data_dws', 'dws_user_content_order', course_order_dict, file_name)
row1 = save_result('tamp_data_dws', 'dws_user_content_order', course_order_dict, file_name)
# 老课程订单
old_course_order_dict = query_dws_old_course_order()
save_result('tamp_data_dws', 'dws_user_content_order', old_course_order_dict, file_name)
row2 = save_result('tamp_data_dws', 'dws_user_content_order', old_course_order_dict, file_name)
# 购买探普贝
recharge_order_dict = query_dws_recharge_order()
save_result('tamp_data_dws', 'dws_user_content_order', recharge_order_dict, file_name)
row3 = save_result('tamp_data_dws', 'dws_user_content_order', recharge_order_dict, file_name)
# 直播订单
live_order_dict = query_dws_live_order()
save_result('tamp_data_dws', 'dws_user_content_order', live_order_dict, file_name)
row4 = save_result('tamp_data_dws', 'dws_user_content_order', live_order_dict, file_name)
# 购买栏目订单
column_order_dict = query_dws_column_order()
save_result('tamp_data_dws', 'dws_user_content_order', column_order_dict, file_name)
row5 = save_result('tamp_data_dws', 'dws_user_content_order', column_order_dict, file_name)
# 购买新课程课件
course_ware_order_dict = query_dws_course_ware_order()
save_result('tamp_data_dws', 'dws_user_content_order', course_ware_order_dict, file_name)
row6 = save_result('tamp_data_dws', 'dws_user_content_order', course_ware_order_dict, file_name)
# 购买附件
file_order_dict = query_dws_file_order()
save_result('tamp_data_dws', 'dws_user_content_order', file_order_dict, file_name)
row7 = save_result('tamp_data_dws', 'dws_user_content_order', file_order_dict, file_name)
# 线下活动订单
offline_activity_order_dict = query_dws_offline_activity_order()
save_result('tamp_data_dws', 'dws_user_content_order', offline_activity_order_dict, file_name)
row8 = save_result('tamp_data_dws', 'dws_user_content_order', offline_activity_order_dict, file_name)
row = row1 + row2 + row3 + row4 + row5 + row6 + row7 + row8
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dws', 'dws_user_content_order', data_dt, row, 'done', task_file, now_time)
def query_dws_course_order():
......
# -*- coding: utf-8 -*-
'''
用户首次登录客户端时间,定时任务,每天运行一次
'''
import logging
import sys
from common.mysql_uitl import fetch_all, insert_batch, save_result
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.ERROR)
file_name = sys.argv[0]
def dws_user_first_login_client_time(data_dt):
login_time_dict = query_first_login_time(data_dt)
save_user_login_time(login_time_dict)
# update_user_name()
def query_first_login_time(data_dt):
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
sql = '''
select user_id
,real_name
,user_name
,nickname
,team_id
,min(if(env = 'ios',start_time,null)) as login_ios_time
,min(if(env = 'android',start_time, null)) as login_android_time
,min(if(env = 'PCManager',start_time, null)) as login_pc_time
,min(if(env = 'wechat',start_time, null)) as login_wechat_time
,min(if(env = 'xcx',start_time, null)) as login_xcx_time
from tamp_data_dwd.dwd_user_login_environment
where data_dt = %s
group by user_id, real_name, user_name, nickname, team_id
'''
exist_users_dict = fetch_all(sql, data_dt)
logging.info(f'{function_name} success')
return exist_users_dict
def save_user_login_time(login_time_dict):
if login_time_dict:
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
database = 'tamp_data_dws'
table = 'dws_user_first_login_client_time'
result_table = database + '.' + table
field_list = ['login_wechat_time', 'login_ios_time', 'login_android_time', 'login_pc_time', 'login_xcx_time']
for field in field_list:
result_list = list()
client_dict_list = list()
for i in login_time_dict:
if i[field]:
result_list.append([i['real_name'], i['user_name'], i['nickname'], i['team_id'], i[field], i['user_id']])
client_dict_list.append(i)
sql = f'''select user_id
,real_name
,user_name
,nickname
,team_id
,{field} from {result_table}
where user_id in ({','.join(["'%s'" % item['user_id'] for item in login_time_dict])})
and {field} is not null '''
exist_users_dict = fetch_all(sql, None)
update = f'''update {result_table} set real_name = (%s), user_name = (%s), nickname = (%s), team_id = (%s),
{field} = (%s) where user_id = (%s)'''
# 已经存在的用户,则更新另外一个客户端登录的时间
insert_batch(update, result_list)
# 不存在的用户,则插入新数据
not_exist_users_dict = [x for x in client_dict_list if x not in exist_users_dict]
save_result(database, table, not_exist_users_dict, file_name)
logging.info(f'{function_name} success')
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 9, 17)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dws_user_first_login_client_time(data_dt)
data_dt += delta
# -*- coding: utf-8 -*-
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
'''
用户学习课程明细/汇总数据统计,定时任务,每2个小时运行一次
......@@ -12,6 +14,7 @@ from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dws_user_learn_course(data_dt):
......@@ -20,8 +23,9 @@ def dws_user_learn_course(data_dt):
learn_course_dict = query_dws_user_learn_course(data_dt)
merge_share_course_dict = merge_share_course(share_single_course_dict, share_course_dict)
merge_course_result_dict = merge_course_dict(learn_course_dict, merge_share_course_dict)
save_result('tamp_data_dws', 'dws_user_learn_course', merge_course_result_dict, file_name)
row = save_result('tamp_data_dws', 'dws_user_learn_course', merge_course_result_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dws', 'dws_user_learn_course', data_dt, row, 'done', task_file, now_time)
# 分享单节课程
def query_dws_user_share_single_course_num(data_dt):
......
......@@ -6,19 +6,25 @@
'''
import json
import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dws_user_learn_total_dur():
def dws_user_learn_total_dur(data_dt):
user_learn_total_dur_dict = query_dws_user_learn_total_dur()
save_dws_user_learn_total_dur(user_learn_total_dur_dict)
# save_dws_user_learn_total_dur(user_learn_total_dur_dict)
row = save_result('tamp_data_dws', 'dws_user_learn_total_dur', user_learn_total_dur_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dws', 'dws_user_learn_total_dur', data_dt, row, 'done', task_file, now_time)
# 数据量大量,需要用增加的方式查询,增加一张历史统计记录表,增数据+历史数据=汇总数据
......
......@@ -5,19 +5,25 @@
用户登录环境,定时任务,每2个小时运行一次
'''
import logging
import os
import sys
from common.mysql_uitl import fetch_all, insert_batch, update_batch, fetch_all_list
from common.mysql_uitl import fetch_all, insert_batch, update_batch, fetch_all_list, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dws_user_login_environment():
def dws_user_login_environment(data_dt):
login_env_dict = query_dws_user_login_env()
save_dws_user_login_environment(login_env_dict)
# save_dws_user_login_environment(login_env_dict)
row = save_result('tamp_data_dws', 'dws_user_login_environment', login_env_dict, file_name)
dws_user_recent_login_env_list = query_dws_user_recent_login_env()
update_dws_user_recent_login_env(dws_user_recent_login_env_list)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dws', 'dws_user_login_environment', data_dt, row, 'done', task_file, now_time)
def query_dws_user_login_env():
......
......@@ -5,18 +5,25 @@
用户登录环境,定时任务,每2个小时运行一次
'''
import logging
import os
import sys
from common.file_uitil import get_file_path, get_file_name
from common.mysql_uitl import fetch_all, insert_batch, fetch_all_list
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from common.time_util import now_str, YMDHMS_FORMAT
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dws_user_login_phone_mode():
def dws_user_login_phone_mode(data_dt):
login_phone_mode_dict = query_user_use_commonly_phone()
save_dws_user_login_phone_mode(login_phone_mode_dict)
# save_dws_user_login_phone_mode(login_phone_mode_dict)
row = save_result('tamp_data_dws', 'dws_user_login_phone_mode', login_phone_mode_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dws', 'dws_user_login_phone_mode', data_dt, row, 'done', task_file, now_time)
def query_user_use_commonly_phone():
......
......@@ -6,9 +6,13 @@
'''
import json
import logging
import os
import sys
from common.mysql_uitl import fetch_all, insert_batch
from common.mysql_uitl import fetch_all, insert_batch, save_result, save_etl_log
from datetime import datetime
from common.time_util import YMDHMS_FORMAT, now_str
'''
用户最近60天登陆最多的地区
......@@ -16,17 +20,19 @@ from common.mysql_uitl import fetch_all, insert_batch
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dws_user_login_top_area(data_dt):
# 计算偏移量
offset = datetime.timedelta(days=-60)
# 获取想要的日期的时间
start_date = (data_dt + offset).strftime('%Y-%m-%d')
start_date = (datetime.datetime.strptime(str(data_dt), "%Y-%m-%d") - datetime.timedelta(days=60)).strftime("%Y-%m-%d")
end_date = data_dt
login_area_dict = query_login_area(start_date, end_date)
login_top_area_dict = get_top_area(login_area_dict)
save_dws_user_login_area(login_top_area_dict)
# row = save_dws_user_login_area(login_top_area_dict)
row = save_result('tamp_data_dws', 'dws_user_login_top_area', login_top_area_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dws', 'dws_user_login_top_area', data_dt, row, 'done', task_file, now_time)
def query_login_area(start_date, end_date):
......@@ -51,7 +57,6 @@ def query_login_area(start_date, end_date):
) p
group by p.user_id
'''
print(start_date, end_date)
login_area_dict = fetch_all(sql, (start_date, end_date))
logging.info(f'query_login_area success')
return login_area_dict
......@@ -92,18 +97,20 @@ def save_dws_user_login_area(ret):
values.append(tuple(i.values()))
sql = f'''replace into tamp_data_dws.dws_user_login_top_area ( {fields} ) values ( {place_holder} )'''
rs = insert_batch(sql, values)
logging.info(str(rs))
# if rs == row or rs == 2 * row: # 因为这里用的是replace
if rs >= row: # 因为有部分数据是覆盖
logging.info(f'save_dws_user_login_area success {row}')
else:
logging.error(f'save_dws_user_login_area error 数据为:{row}行,插入成功为:{rs} 行 执行程序为:{file_name}')
return row
if __name__ == '__main__':
# dws_user_login_area()
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 17)
end = datetime.date(2021, 9, 18)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
# -*- coding: utf-8 -*-
import logging
import sys
'''
访问线索统计
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
def dws_user_visitor_clues(data_dt):
pass
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 9, 7)
end = datetime.date(2021, 9, 7)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
print(data_dt.strftime("%Y-%m-%d"))
dws_user_visitor_clues(data_dt)
data_dt += delta
# -*- coding: utf-8 -*-
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log
from common.time_util import now, now_str, YMDHMS_FORMAT
'''
用户观看直播明细数据统计,定时任务,每2个小时运行一次
......@@ -10,13 +12,18 @@ from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dws_user_watch_live(data_dt):
share_live_dict = query_dws_user_share_live_num(data_dt)
watch_live_dict = query_dws_user_watch_live(data_dt)
merge_live_result_dict = merge_live_dict(watch_live_dict, share_live_dict)
save_result('tamp_data_dws', 'dws_user_watch_live', merge_live_result_dict, file_name)
row = save_result('tamp_data_dws', 'dws_user_watch_live', merge_live_result_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dws', 'dws_user_watch_live', data_dt, row, 'done', task_file, now_time)
# 加依赖关系,也需要自动配置
# 分享直播
......@@ -110,12 +117,16 @@ def merge_live_dict(watch_live_dict, share_live_dict):
logging.info(f'{function_name} success')
return merge_live_result
#
# def save_etl_log(row, data_dt):
# sql = '''insert into tamp_data_dwd.dwd_etl_log (data_dt, )
# '''
if __name__ == '__main__':
import datetime
begin = datetime.date(2021, 4, 1)
end = datetime.date(2021, 9, 14)
begin = datetime.date(2021, 9, 21)
end = datetime.date(2021, 9, 22)
data_dt = begin
delta = datetime.timedelta(days=1)
while data_dt <= end:
......
# -*- coding: utf-8 -*-
import logging
import os
import sys
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
from common.mysql_uitl import fetch_all, save_result, insert_batch, insert, save_etl_log
from common.time_util import YMDHMS_FORMAT, now_str
'''
用户观看短视频明细数据统计,定时任务,每2个小时运行一次
......@@ -10,13 +12,16 @@ from common.mysql_uitl import fetch_all, save_result, insert_batch, insert
'''
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
task_file = os.path.split(__file__)[-1].split(".")[0]
def dws_user_watch_short_video(data_dt):
share_short_video_dict = query_dws_user_share_short_video_num(data_dt)
watch_short_video_dict = query_dws_user_watch_short_video(data_dt)
merge_short_video_result_dict = merge_short_video_dict(watch_short_video_dict, share_short_video_dict)
save_result('tamp_data_dws', 'dws_user_watch_short_video', merge_short_video_result_dict, file_name)
row = save_result('tamp_data_dws', 'dws_user_watch_short_video', merge_short_video_result_dict, file_name)
now_time = now_str(YMDHMS_FORMAT)
save_etl_log('tamp_data_dws', 'dws_user_watch_short_video', data_dt, row, 'done', task_file, now_time)
# 分享短视频
......
......@@ -2,8 +2,6 @@
import logging
import sys
from edw.ads.user.ads_user_content_order_records import ads_user_content_order_records
from edw.dwd.user.dwd_user_content_order import dwd_user_content_order
from edw.dws.user.dws_user_content_order import dws_user_content_order
'''
......@@ -14,6 +12,20 @@ from edw.dws.user.dws_user_content_order import dws_user_content_order
from apscheduler.schedulers.blocking import BlockingScheduler
from common.time_util import get_run_time
from edw.ads.basic.ads_app_install import ads_app_install
from edw.ads.basic.ads_platform_active import ads_platform_active
from edw.ads.basic.ads_user_level_active import ads_user_level_active
from edw.ads.user.ads_user_content_order_records import ads_user_content_order_records
from edw.ads.user.ads_user_watch_short_video import ads_user_watch_short_video
from edw.dwd.basic.dwd_app_install import dwd_app_install
from edw.dwd.user.dwd_user_content_order import dwd_user_content_order
from edw.dwd.user.dwd_user_first_login_client_time import dwd_user_first_login_client_time
from edw.dwd.user.dwd_user_login_pc_record import dwd_user_login_pc_record
from edw.dwd.user.dwd_user_studio_add_content import dwd_user_studio_add_content
from edw.dwd.user.dwd_user_watch_short_video import dwd_user_watch_short_video
from edw.dws.user.dws_user_first_login_client_time import dws_user_first_login_client_time
from edw.dws.user.dws_user_watch_short_video import dws_user_watch_short_video
from edw.ads.user.ads_user_basic_behavior import ads_user_basic_behavior
from edw.ads.user.ads_user_browse_fund import ads_user_browse_fund
from edw.ads.user.ads_user_learn_course import ads_user_learn_course
......@@ -32,9 +44,9 @@ from edw.dws.user.dws_user_learn_total_dur import dws_user_learn_total_dur
from edw.dws.user.dws_user_login_top_area import dws_user_login_top_area
from edw.dws.user.dws_user_login_environment import dws_user_login_environment
from edw.dws.user.dws_user_login_phone_mode import dws_user_login_phone_mode
from edw.dws.user.dws_user_visitor_clues import dws_user_visitor_clues
from edw.dws.user.dws_user_watch_live import dws_user_watch_live
from edw.ods.user.ods_users_info import ods_users_info
from edw.dwd.user.dwd_user_community_res import dwd_user_community_res
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
......@@ -49,30 +61,42 @@ data_dt = run_hour_time[0: 10]
def dwd_task_minute():
'''
按分钟定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 内容订单数据
dwd_user_content_order()
dwd_user_content_order(data_dt)
logging.info(f'{function_name} end')
def dws_task_minute():
'''
按分钟定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 内容订单数据
dws_user_content_order()
dws_user_content_order(data_dt)
logging.info(f'{function_name} end')
def ads_task_minute():
'''
按分钟定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 内容订单数据
ads_user_content_order_records()
ads_user_content_order_records(data_dt)
logging.info(f'{function_name} end')
def ods_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 用户信息
......@@ -80,41 +104,93 @@ def ods_task_hour():
logging.info(f'{function_name} end')
def dwd_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
dwd_user_watch_live(data_dt)
dwd_user_browse_fund(data_dt)
dwd_user_learn_course(data_dt)
dwd_user_community_res(data_dt)
dwd_user_login_area(data_dt)
dwd_user_learn_course(data_dt)
dwd_user_login_environment(data_dt)
dwd_user_login_phone_mode(data_dt)
dwd_user_share_event(data_dt)
dwd_user_studio_add_content(data_dt)
dwd_user_visitor_clues(data_dt)
dwd_user_watch_live(data_dt)
dwd_user_watch_short_video(data_dt)
logging.info(f'{function_name} end')
def dws_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
dws_user_browse_fund(data_dt)
dws_user_learn_course(data_dt)
dws_user_learn_total_dur()
dws_user_login_top_area(data_dt)
dws_user_login_environment()
dws_user_login_phone_mode()
dws_user_visitor_clues(data_dt)
dws_user_login_phone_mode(data_dt)
dws_user_learn_total_dur(data_dt)
dws_user_first_login_client_time(data_dt)
dws_user_login_environment(data_dt)
dws_user_learn_course(data_dt)
dws_user_browse_fund(data_dt)
dws_user_watch_live(data_dt)
dws_user_watch_short_video(data_dt)
logging.info(f'{function_name} end')
def ads_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
ads_user_basic_behavior()
ads_user_basic_behavior(data_dt)
ads_user_browse_fund(data_dt)
ads_user_learn_course(data_dt)
ads_user_watch_live(data_dt)
ads_user_watch_short_video(data_dt)
ads_user_content_order_records(data_dt)
logging.info(f'{function_name} end')
def dwd_task_day():
'''
按天定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
dwd_app_install(data_dt)
dwd_user_login_pc_record(data_dt)
dwd_user_first_login_client_time(data_dt)
logging.info(f'{function_name} end')
def dws_task_day():
'''
按天定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
logging.info(f'{function_name} end')
def ads_task_day():
'''
按天定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
ads_user_level_active(data_dt)
ads_platform_active(data_dt)
ads_app_install(data_dt)
logging.info(f'{function_name} end')
......@@ -128,10 +204,12 @@ if __name__ == '__main__':
# 两个小时定时调度
# scheduler.add_job(ods_task_hour, "interval", hours=2)
scheduler.add_job(dwd_task_hour, "interval", hours=2)
scheduler.add_job(dws_task_hour, "interval", hours=2)
scheduler.add_job(ads_task_hour, "interval", hours=2)
# 按天调度
# scheduler.add_job(scheduler_day_job, "interval", minutes=20)
scheduler.add_job(dwd_task_hour, "interval", minutes=30)
scheduler.add_job(dws_task_hour, "interval", minutes=30)
scheduler.add_job(ads_task_hour, "interval", minutes=30)
# 按天定时任务
scheduler.add_job(dwd_task_day, "interval", hours=1)
scheduler.add_job(dws_task_day, "interval", hours=1)
scheduler.add_job(ads_task_day, "interval", hours=1)
scheduler.start()
# -*- coding: utf-8 -*-
import logging
import sys
from edw.ads.basic.ads_app_install import ads_app_install
from edw.ads.user.ads_user_content_order_records import ads_user_content_order_records
from edw.dwd.basic.dwd_app_install import dwd_app_install
from edw.dwd.user.dwd_user_content_order import dwd_user_content_order
from edw.dwd.user.dwd_user_login_pc_record import dwd_user_login_pc_record
from edw.dws.user.dws_user_content_order import dws_user_content_order
'''
调度任务,分层ods,dwd,dws,ads 分层调度
调度又根据业务需求,分为按小时调度和按天调度
'''
from apscheduler.schedulers.blocking import BlockingScheduler
from common.time_util import get_run_time
from edw.ads.user.ads_user_basic_behavior import ads_user_basic_behavior
from edw.ads.user.ads_user_browse_fund import ads_user_browse_fund
from edw.ads.user.ads_user_learn_course import ads_user_learn_course
from edw.ads.user.ads_user_watch_live import ads_user_watch_live
from edw.dwd.user.dwd_user_browse_fund import dwd_user_browse_fund
from edw.dwd.user.dwd_user_learn_course import dwd_user_learn_course
from edw.dwd.user.dwd_user_login_area import dwd_user_login_area
from edw.dwd.user.dwd_user_login_environment import dwd_user_login_environment
from edw.dwd.user.dwd_user_login_phone_mode import dwd_user_login_phone_mode
from edw.dwd.user.dwd_user_share_event import dwd_user_share_event
from edw.dwd.user.dwd_user_visitor_clues import dwd_user_visitor_clues
from edw.dwd.user.dwd_user_watch_live import dwd_user_watch_live
from edw.dws.user.dws_user_browse_fund import dws_user_browse_fund
from edw.dws.user.dws_user_learn_course import dws_user_learn_course
from edw.dws.user.dws_user_learn_total_dur import dws_user_learn_total_dur
from edw.dws.user.dws_user_login_top_area import dws_user_login_top_area
from edw.dws.user.dws_user_login_environment import dws_user_login_environment
from edw.dws.user.dws_user_login_phone_mode import dws_user_login_phone_mode
from edw.dws.user.dws_user_visitor_clues import dws_user_visitor_clues
from edw.dws.user.dws_user_watch_live import dws_user_watch_live
from edw.ods.user.ods_users_info import ods_users_info
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
file_name = sys.argv[0]
# 按分钟定时
run_minute_time = get_run_time(30) # 滞后30分钟,去取时间。时间取值范围有全量,还有按天计算的(按天计算时,时间范围取值都是从0点~24点)
data_minute_dt = run_minute_time[0: 10]
# 延迟2.5 小时,去取数,因为定时任务是每隔2个小时运行
run_hour_time = get_run_time(60 * 2.5)
data_dt = run_hour_time[0: 10]
def dwd_task_minute():
'''
按分钟定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 内容订单数据
dwd_user_content_order()
logging.info(f'{function_name} end')
def dws_task_minute():
'''
按分钟定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 内容订单数据
dws_user_content_order()
logging.info(f'{function_name} end')
def ads_task_minute():
'''
按分钟定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 内容订单数据
ads_user_content_order_records()
logging.info(f'{function_name} end')
def ods_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
# 用户信息
ods_users_info()
logging.info(f'{function_name} end')
def dwd_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
dwd_user_watch_live(data_dt)
dwd_user_browse_fund(data_dt)
dwd_user_learn_course(data_dt)
dwd_user_login_area(data_dt)
dwd_user_login_environment(data_dt)
dwd_user_login_phone_mode(data_dt)
dwd_user_share_event(data_dt)
dwd_user_visitor_clues(data_dt)
logging.info(f'{function_name} end')
def dws_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
dws_user_browse_fund(data_dt)
dws_user_learn_course(data_dt)
dws_user_learn_total_dur()
dws_user_login_top_area(data_dt)
dws_user_login_environment()
dws_user_login_phone_mode()
dws_user_visitor_clues(data_dt)
dws_user_watch_live(data_dt)
logging.info(f'{function_name} end')
def ads_task_hour():
'''
按小时定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
ads_user_basic_behavior(data_dt)
ads_user_browse_fund(data_dt)
ads_user_learn_course(data_dt)
ads_user_watch_live(data_dt)
logging.info(f'{function_name} end')
def dwd_task_day():
'''
按天定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
dwd_app_install(data_dt)
dwd_user_login_pc_record(data_dt)
logging.info(f'{function_name} end')
def dws_task_day():
'''
按天定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
logging.info(f'{function_name} end')
def ads_task_day():
'''
按天定时调度任务
'''
function_name = sys._getframe().f_code.co_name
logging.info(f'{function_name} start')
ads_app_install(data_dt)
logging.info(f'{function_name} end')
if __name__ == '__main__':
scheduler = BlockingScheduler()
# 按分定时调度
scheduler.add_job(dwd_task_minute, "interval", minutes=10)
scheduler.add_job(dws_task_minute, "interval", minutes=10)
scheduler.add_job(ads_task_minute, "interval", minutes=10)
# 两个小时定时调度
# scheduler.add_job(ods_task_hour, "interval", hours=2)
scheduler.add_job(dwd_task_hour, "interval", hours=2)
scheduler.add_job(dws_task_hour, "interval", hours=2)
scheduler.add_job(ads_task_hour, "interval", hours=2)
# 按天调度
# scheduler.add_job(scheduler_day_job, "interval", minutes=20)
# 按天定时任务
scheduler.add_job(dwd_task_day, "interval", hours=3)
scheduler.add_job(dws_task_day, "interval", hours=3)
scheduler.add_job(ads_task_day, "interval", hours=3)
scheduler.start()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment