在现有的 api_device_water_usage_statistics
接口中,用水量数据以立方米(方)为单位返回。为了提高数据的可读性和符合业务需求,需要将用水量单位改为“万方”(即10,000立方米)。本文将提供一个全面的指南,详细说明如何在现有代码中进行必要的修改,以实现这一转换。
当前的 API 接口主要执行以下步骤:
search_time
、statistics_type
以及位置 ID。为了将用水量转换为“万方”,需要在以下几个关键位置进行修改:
1 万方等于 10,000 立方米。因此,需要在计算用水量时,将立方米数除以 10,000。
在处理日统计数据的循环中,进行单位转换如下:
for current_day in (first_day + timedelta(days=i) for i in range((last_day - first_day).days + 1)):
water_use = daily_usage.get(current_day, decimal.Decimal('0.000')) # 无数据时填充 0
daily_water_usage[current_day] = water_use / decimal.Decimal('10000') # 转换为万方
total_water_use += water_use / decimal.Decimal('10000') # 转换总用水量为万方
同时,更新响应数据:
device_statistics_data = {
"water_usage": [water_use for water_use in daily_water_usage.values()],
"total": total_water_use,
}
在处理月统计数据的循环中,进行单位转换如下:
for current_month in (datetime.date(search_date.year, month, 1) for month in range(1, 13)):
water_use = monthly_usage.get(current_month, decimal.Decimal('0.000')) # 无数据时填充 0
monthly_water_usage[current_month] = water_use / decimal.Decimal('10000') # 转换为万方
total_water_use += water_use / decimal.Decimal('10000') # 转换总用水量为万方
同时,更新响应数据:
device_statistics_data = {
"water_usage": [water_use for water_use in monthly_water_usage.values()],
"total": total_water_use,
}
确保在响应中,所有涉及用水量的字段都已进行单位转换。具体来说:
water_usage
列表中的所有值均已转换为万方。total
字段的值已转换为万方。例如:
device_statistics_data = {
"water_usage": [water_use for water_use in daily_water_usage.values()], # 已经是万方
"total": total_water_use, # 已经是万方
}
以下是完整修改后的代码示例,包括日统计和月统计部分的用水量单位转换:
@custom_require_http_methods(['GET'])
@json_request_handler
@token_validation_handler
def api_device_water_usage_statistics(request):
"""用水量统计接口"""
# 按时间查找
search_time = request.json_data.get('search_time') # 查询时间 2024 或 2024-3
# 字段校验
if not search_time:
return error_response('请指定查询时间')
device_query = DeviceInfoModel.objects.all()
statistics_type = request.json_data.get('statistics_type') or '日' # 统计类型 月/日 默认为日
if not statistics_type:
return error_response('请指定查询类型')
# 根据位置查询
location_l1_id = request.json_data.get('location_l1_id') # 一级位置信息
location_l2_id = request.json_data.get('location_l2_id') # 二级位置信息
location_l3_id = request.json_data.get('location_l3_id') # 三级位置信息
location = None
if location_l3_id:
location = Location.objects.filter(id=location_l3_id).first()
elif location_l2_id:
location = Location.objects.filter(id=location_l2_id).first()
elif location_l1_id:
location = Location.objects.filter(id=location_l1_id).first()
if location_l3_id or location_l2_id or location_l1_id:
if not location:
return error_response(message='指定位置不存在')
if location:
if location_l3_id:
device_query = device_query.filter(location_id=location_l3_id)
elif location_l2_id or location_l1_id:
leaf_location_query = location.get_leafnodes()
device_query = device_query.filter(location__in=leaf_location_query)
if device_query.count() == 0:
return error_response('未查询到设备数据')
# 返回结果
if statistics_type == '日':
try:
# 将字符串转换为 datetime 对象
search_date = datetime.datetime.strptime(search_time, "%Y-%m")
except Exception as e:
return error_response('日期格式错误:%Y-%m {}'.format(e))
# 确定月份的第一天和最后一天
first_day = datetime.date(search_date.year, search_date.month, 1)
last_day = (datetime.date(search_date.year, search_date.month + 1, 1) - timedelta(days=1)
if search_date.month < 12
else datetime.date(search_date.year + 1, 1, 1) - timedelta(days=1))
# 查询设备数据
device_info_dict = {}
sql_query = """
SELECT
l1.`name` AS location_1_name,
l2.`name` AS location_2_name,
l3.`name` AS location_3_name,
owner_name,
well_number,
license_no,
water_permit_e_certificate_code,
device_id
FROM device_info_table
INNER JOIN location_table AS l3 ON device_info_table.location_id = l3.id
INNER JOIN location_table AS l2 ON l3.parent_id=l2.id
INNER JOIN location_table AS l1 ON l2.parent_id = l1.id
WHERE device_info_table.license_no IN ({})
""".format(','.join(["'{}'".format(device.license_no) for device in device_query]))
print(sql_query)
sql_results = execute_raw_sql_dict(sql_query)
for row in sql_results:
device_info_dict[row.get('license_no')] = row
# 查询所有相关数据
all_daily_data = (
DeviceDailyUsageModel.objects.filter(statistics_date__range=(first_day, last_day)).exclude(water_use__gt=100000000)
.values("license_no", "statistics_date")
.annotate(daily_water_use=Sum("water_use"))
)
# 按 license_no 分组
data_by_license_no = defaultdict(dict)
for entry in all_daily_data:
license_no = entry["license_no"]
statistics_date = entry["statistics_date"]
daily_water_use = entry["daily_water_use"]
data_by_license_no[license_no][statistics_date] = daily_water_use
# 设备数据初始化
specific = []
# 遍历所有设备并生成统计数据
for device in device_query:
daily_usage = data_by_license_no.get(device.license_no, {})
daily_water_usage = {}
total_water_use = 0
# 填补日期和计算总和
for current_day in (first_day + timedelta(days=i) for i in range((last_day - first_day).days + 1)):
water_use = daily_usage.get(current_day, decimal.Decimal('0.000')) # 无数据时填充 0
daily_water_usage[current_day] = water_use / decimal.Decimal('10000') # 转换为万方
total_water_use += water_use / decimal.Decimal('10000') # 转换总用水量为万方
device_statistics_data = {
"water_usage": [water_use for water_use in daily_water_usage.values()],
"total": total_water_use,
}
try:
device_statistics_data.update(device_info_dict.get(device.license_no)) # 合并设备其他字段
except Exception as e:
print(e)
print(device_statistics_data)
specific.append(device_statistics_data)
return success_response(data=specific)
else:
# 校验日期 月
# 假设传入的 search_time 为年份字符串,如 "2024"
search_date = datetime.datetime.strptime(search_time, "%Y")
# 确定年份的第一天和最后一天
first_day = datetime.date(search_date.year, 1, 1)
last_day = datetime.date(search_date.year, 12, 31)
# 查询设备数据
device_info_dict = {}
sql_query = """
SELECT
l1.`name` AS location_1_name,
l2.`name` AS location_2_name,
l3.`name` AS location_3_name,
owner_name,
well_number,
license_no,
water_permit_e_certificate_code,
device_id
FROM device_info_table
INNER JOIN location_table AS l3 ON device_info_table.location_id = l3.id
INNER JOIN location_table AS l2 ON l3.parent_id=l2.id
INNER JOIN location_table AS l1 ON l2.parent_id = l1.id
WHERE device_info_table.license_no IN ({})
""".format(','.join(["'{}'".format(device.license_no) for device in device_query]))
print(sql_query)
sql_results = execute_raw_sql_dict(sql_query)
for row in sql_results:
device_info_dict[row.get('license_no')] = row
# 查询所有相关数据
all_monthly_data = (
DeviceDailyUsageModel.objects.filter(statistics_date__range=(first_day, last_day)).exclude(water_use__gt=100000000)
.annotate(month=TruncMonth("statistics_date")) # 按月份截断日期
.values("license_no", "month")
.annotate(monthly_water_use=Sum("water_use"))
)
# 按 license_no 分组
data_by_license_no = defaultdict(dict)
for entry in all_monthly_data:
license_no = entry["license_no"]
month = entry["month"]
monthly_water_use = entry["monthly_water_use"]
data_by_license_no[license_no][month] = monthly_water_use
# 初始化结果
specific = []
# 遍历所有设备并生成统计数据
for device in device_query:
monthly_usage = data_by_license_no.get(device.license_no, {})
monthly_water_usage = {}
total_water_use = 0
# 按月填补数据
for current_month in (datetime.date(search_date.year, month, 1) for month in range(1, 13)):
water_use = monthly_usage.get(current_month, decimal.Decimal('0.000')) # 无数据时填充 0
monthly_water_usage[current_month] = water_use / decimal.Decimal('10000') # 转换为万方
total_water_use += water_use / decimal.Decimal('10000') # 转换总用水量为万方
device_statistics_data = {
"water_usage": [water_use for water_use in monthly_water_usage.values()],
"total": total_water_use,
}
try:
device_statistics_data.update(device_info_dict.get(device.license_no)) # 合并设备其他字段
except Exception as e:
print(e)
print(device_statistics_data)
specific.append(device_statistics_data)
return success_response(data=specific)
water_usage
和 total
字段现在以“万方”为单位。在完成代码修改后,进行以下测试以确保功能的正确性:
statistics_type
(如“日”或“月”)。search_time
(如“2024-03”或“2024”)。water_usage
和 total
字段是否已正确转换为万方。
def convert_to_wanfang(water_use):
return water_use / decimal.Decimal('10000')
通过上述步骤,我们成功地将 api_device_water_usage_statistics
接口中返回的用水量单位从“方”转换为“万方”。这种转换不仅提升了数据的可读性,也更符合实际业务需求。在实际开发过程中,类似的单位转换需求应遵循以下原则:
通过遵循这些最佳实践,可以确保 API 的可靠性和可维护性,为用户提供准确和有用的数据。