feat(dict) 分布式锁

This commit is contained in:
expressgy 2025-07-07 22:34:45 +08:00
parent cd49a78678
commit e8e352b6b6

View File

@ -13,6 +13,7 @@ import { sysDict } from '@/eneities';
import { eq, and, or, ne, desc, asc, sql, inArray, isNull, not, max } from 'drizzle-orm';
import { successResponse, BusinessError } from '@/utils/responseFormate';
import { nextId } from '@/utils/snowflake';
import { DistributedLockService } from '@/utils/distributedLock';
import type {
CreateDictRequest,
GetDictTreeByCodeRequest,
@ -43,106 +44,113 @@ export class DictService {
* @throws BusinessError
*/
public async createDict(body: CreateDictRequest): Promise<CreateDictSuccessType> {
// 1. code唯一性校验
const existCode = await db()
.select({ id: sysDict.id })
.from(sysDict)
.where(eq(sysDict.code, body.code))
.limit(1);
if (existCode.length > 0) {
throw new BusinessError(`字典代码已存在: ${body.code}`, 409);
}
// 2. name同级唯一性校验
const pid = body.pid || '0';
const existName = await db()
.select({ id: sysDict.id })
.from(sysDict)
.where(and(eq(sysDict.name, body.name), eq(sysDict.pid, pid)))
.limit(1);
if (existName.length > 0) {
throw new BusinessError(`字典名称已存在: ${body.name}`, 409);
}
const lockKey = `dict:create:code:${body.code}:name:${body.name}:pid:${pid}`;
const lock = await DistributedLockService.acquire({ key: lockKey, ttl: 10 });
// 3. 父级校验与层级处理
let level = 1;
if (pid !== '0') {
const parent = await db().select().from(sysDict).where(eq(sysDict.id, pid)).limit(1);
if (parent.length === 0) {
throw new BusinessError(`父级字典不存在: ${pid}`, 404);
}
if (parent[0]!.status !== 'active') {
throw new BusinessError(`父级字典状态非active: ${pid}`, 400);
}
level = parent[0]!.level + 1;
if (level > 10) {
throw new BusinessError(`字典层级超过限制: ${level}`, 400);
}
}
// 4. sortOrder处理同级最大+1
let sortOrder = 0;
if (body.sortOrder !== undefined) {
sortOrder = body.sortOrder;
} else {
const maxSort = await db()
.select({ maxSort: max(sysDict.sortOrder) })
try {
// 1. code唯一性校验
const existCode = await db()
.select({ id: sysDict.id })
.from(sysDict)
.where(eq(sysDict.pid, pid));
sortOrder = (maxSort[0]?.maxSort ?? 0) + 1;
}
.where(eq(sysDict.code, body.code))
.limit(1);
if (existCode.length > 0) {
throw new BusinessError(`字典代码已存在: ${body.code}`, 409);
}
// 5. 数据写入
const dictId = nextId();
// 2. name同级唯一性校验
const existName = await db()
.select({ id: sysDict.id })
.from(sysDict)
.where(and(eq(sysDict.name, body.name), eq(sysDict.pid, pid)))
.limit(1);
if (existName.length > 0) {
throw new BusinessError(`字典名称已存在: ${body.name}`, 409);
}
await db()
.insert(sysDict)
.values([
// 3. 父级校验与层级处理
let level = 1;
if (pid !== '0') {
const parent = await db().select().from(sysDict).where(eq(sysDict.id, pid)).limit(1);
if (parent.length === 0) {
throw new BusinessError(`父级字典不存在: ${pid}`, 404);
}
if (parent[0]!.status !== 'active') {
throw new BusinessError(`父级字典状态非active: ${pid}`, 400);
}
level = parent[0]!.level + 1;
if (level > 10) {
throw new BusinessError(`字典层级超过限制: ${level}`, 400);
}
}
// 4. sortOrder处理同级最大+1
let sortOrder = 0;
if (body.sortOrder !== undefined) {
sortOrder = body.sortOrder;
} else {
const maxSort = await db()
.select({ maxSort: max(sysDict.sortOrder) })
.from(sysDict)
.where(eq(sysDict.pid, pid));
sortOrder = (maxSort[0]?.maxSort ?? 0) + 1;
}
// 5. 数据写入
const dictId = nextId();
await db()
.insert(sysDict)
.values([
{
id: dictId.toString(),
code: body.code,
name: body.name,
value: body.value ?? null,
description: body.description ?? null,
icon: body.icon ?? null,
pid: BigInt(pid),
level,
sortOrder,
status: body.status,
isSystem: body.isSystem ? 1 : 0,
color: body.color ?? null,
extra: body.extra ?? {},
},
] as any);
// 6. 查询刚插入的数据
const insertedArr = await db().select().from(sysDict).where(eq(sysDict.id, dictId.toString())).limit(1);
if (!insertedArr || insertedArr.length === 0) {
throw new BusinessError('创建字典项失败', 500);
}
const inserted = insertedArr[0]!;
// 7. 返回统一响应
return successResponse(
{
id: dictId.toString(),
code: body.code,
name: body.name,
value: body.value ?? null,
description: body.description ?? null,
icon: body.icon ?? null,
pid: BigInt(pid),
level,
sortOrder,
status: body.status,
isSystem: body.isSystem ? 1 : 0,
color: body.color ?? null,
extra: body.extra ?? {},
id: String(inserted.id),
code: inserted.code,
name: inserted.name,
value: inserted.value,
description: inserted.description,
icon: inserted.icon,
pid: String(inserted.pid),
level: inserted.level,
sortOrder: inserted.sortOrder,
status: inserted.status,
isSystem: Boolean(inserted.isSystem),
color: inserted.color,
extra: inserted.extra,
createdAt: inserted.createdAt,
updatedAt: inserted.updatedAt,
},
] as any);
// 6. 查询刚插入的数据
const insertedArr = await db().select().from(sysDict).where(eq(sysDict.id, dictId.toString())).limit(1);
if (!insertedArr || insertedArr.length === 0) {
throw new BusinessError('创建字典项失败', 500);
'创建字典项成功',
);
} finally {
await lock.release();
}
const inserted = insertedArr[0]!;
// 7. 返回统一响应
return successResponse(
{
id: String(inserted.id),
code: inserted.code,
name: inserted.name,
value: inserted.value,
description: inserted.description,
icon: inserted.icon,
pid: String(inserted.pid),
level: inserted.level,
sortOrder: inserted.sortOrder,
status: inserted.status,
isSystem: Boolean(inserted.isSystem),
color: inserted.color,
extra: inserted.extra,
createdAt: inserted.createdAt,
updatedAt: inserted.updatedAt,
},
'创建字典项成功',
);
}
/**
@ -328,78 +336,83 @@ export class DictService {
* @returns Promise<UpdateDictSuccessType>
*/
public async updateDict(id: string, body: UpdateDictBodyRequest): Promise<UpdateDictSuccessType> {
Logger.info(`更新字典项: ${id}, body: ${JSON.stringify(body)}`);
const lock = await DistributedLockService.acquire({ key: `dict:update:${id}`, ttl: 10 });
try {
Logger.info(`更新字典项: ${id}, body: ${JSON.stringify(body)}`);
// 1. 检查字典项是否存在
const existingArr = await db().select().from(sysDict).where(eq(sysDict.id, id)).limit(1);
if (existingArr.length === 0) {
throw new BusinessError(`字典项不存在: ${id}`, 404);
}
const existing = existingArr[0]!;
// 2. 唯一性校验
if (body.code) {
const existCode = await db()
.select({ id: sysDict.id })
.from(sysDict)
.where(and(eq(sysDict.code, body.code), ne(sysDict.id, id)))
.limit(1);
if (existCode.length > 0) {
throw new BusinessError(`字典代码已存在: ${body.code}`, 409);
// 1. 检查字典项是否存在
const existingArr = await db().select().from(sysDict).where(eq(sysDict.id, id)).limit(1);
if (existingArr.length === 0) {
throw new BusinessError(`字典项不存在: ${id}`, 404);
}
}
if (body.name) {
const pid = body.pid ? String(body.pid) : existing.pid ? String(existing.pid) : '0';
const existName = await db()
.select({ id: sysDict.id })
.from(sysDict)
.where(and(eq(sysDict.name, body.name), eq(sysDict.pid, pid), ne(sysDict.id, id)))
.limit(1);
if (existName.length > 0) {
throw new BusinessError(`同级下字典名称已存在: ${body.name}`, 409);
}
}
const existing = existingArr[0]!;
// 3. 构建更新数据
const updateData: Partial<typeof existing> = {};
for (const key in body) {
if (Object.prototype.hasOwnProperty.call(body, key) && body[key as keyof typeof body] !== undefined) {
if (key === 'isSystem') {
(updateData as any)[key] = body[key] ? 1 : 0;
} else {
(updateData as any)[key] = body[key as keyof typeof body];
// 2. 唯一性校验
if (body.code) {
const existCode = await db()
.select({ id: sysDict.id })
.from(sysDict)
.where(and(eq(sysDict.code, body.code), ne(sysDict.id, id)))
.limit(1);
if (existCode.length > 0) {
throw new BusinessError(`字典代码已存在: ${body.code}`, 409);
}
}
if (body.name) {
const pid = body.pid ? String(body.pid) : existing.pid ? String(existing.pid) : '0';
const existName = await db()
.select({ id: sysDict.id })
.from(sysDict)
.where(and(eq(sysDict.name, body.name), eq(sysDict.pid, pid), ne(sysDict.id, id)))
.limit(1);
if (existName.length > 0) {
throw new BusinessError(`同级下字典名称已存在: ${body.name}`, 409);
}
}
}
if (Object.keys(updateData).length === 0) {
// 3. 构建更新数据
const updateData: Partial<typeof existing> = {};
for (const key in body) {
if (Object.prototype.hasOwnProperty.call(body, key) && body[key as keyof typeof body] !== undefined) {
if (key === 'isSystem') {
(updateData as any)[key] = body[key] ? 1 : 0;
} else {
(updateData as any)[key] = body[key as keyof typeof body];
}
}
}
if (Object.keys(updateData).length === 0) {
return successResponse(
{
...existing,
id: String(existing.id),
pid: String(existing.pid),
isSystem: Boolean(existing.isSystem),
},
'未提供任何更新内容',
);
}
// 4. 执行更新
await db().update(sysDict).set(updateData).where(eq(sysDict.id, id));
// 5. 查询并返回更新后的数据
const updatedArr = await db().select().from(sysDict).where(eq(sysDict.id, id)).limit(1);
const updated = updatedArr[0]!;
return successResponse(
{
...existing,
id: String(existing.id),
pid: String(existing.pid),
isSystem: Boolean(existing.isSystem),
...updated,
id: String(updated.id),
pid: String(updated.pid),
isSystem: Boolean(updated.isSystem),
},
'未提供任何更新内容',
'更新字典项成功',
);
} finally {
await lock.release();
}
// 4. 执行更新
await db().update(sysDict).set(updateData).where(eq(sysDict.id, id));
// 5. 查询并返回更新后的数据
const updatedArr = await db().select().from(sysDict).where(eq(sysDict.id, id)).limit(1);
const updated = updatedArr[0]!;
return successResponse(
{
...updated,
id: String(updated.id),
pid: String(updated.pid),
isSystem: Boolean(updated.isSystem),
},
'更新字典项成功',
);
}
/**
@ -410,87 +423,91 @@ export class DictService {
* @description 使SQL保证性能和类型安全
*/
public async sortDict(body: SortDictRequest): Promise<SortDictSuccessType> {
const { items } = body;
if (!items || items.length === 0) {
throw new BusinessError('排序项列表不能为空', 400);
}
const itemIds = items.map((item) => BigInt(item.id));
const parentIds = [
...new Set(
items
.map((item) => item.pid)
.filter((pid) => pid !== 0 && pid !== '0')
.map((pid) => BigInt(pid as string)),
),
];
const lock = await DistributedLockService.acquire({ key: `dict:sort`, ttl: 30 });
try {
await db().transaction(async (tx) => {
// 1. 使用原生SQL验证所有待排序的字典项是否存在
if (itemIds.length > 0) {
const result: any[] = await tx.execute(
sql`SELECT id FROM sys_dict WHERE id IN ${itemIds}`,
);
const existingIds = result[0].map((row: { id: bigint }) => row.id);
if (existingIds.length !== itemIds.length) {
const foundIds = new Set(existingIds);
const notFoundIds = itemIds.filter((id) => !foundIds.has(id));
Logger.warn(`部分字典项不存在:${notFoundIds.join(', ')}`);
throw new BusinessError(`部分字典项不存在: ${notFoundIds.join(', ')}`, 404);
}
}
// 2. 使用原生SQL验证所有父级字典是否存在
if (parentIds.length > 0) {
const result: any[] = await tx.execute(
sql`SELECT id FROM sys_dict WHERE id IN ${parentIds}`,
);
const existingParentIds = result[0].map((row: { id: bigint }) => row.id);
if (existingParentIds.length !== parentIds.length) {
const foundParentIds = new Set(existingParentIds);
const notFoundParentIds = parentIds.filter((id) => !foundParentIds.has(id));
Logger.warn(`部分父级字典不存在:${notFoundParentIds.join(', ')}`);
throw new BusinessError(
`部分父级字典不存在: ${notFoundParentIds.join(', ')}`,
404,
);
}
}
// 3. 使用原生SQL构建并执行批量更新
if (items.length > 0) {
const pidSqlChunks = items.map(
(item) =>
sql`WHEN ${BigInt(item.id)} THEN ${
item.pid === 0 || item.pid === '0' ? BigInt(0) : BigInt(item.pid)
}`,
);
const sortOrderSqlChunks = items.map(
(item) => sql`WHEN ${BigInt(item.id)} THEN ${item.sortOrder}`,
);
const finalUpdateQuery = sql`
UPDATE sys_dict
SET
pid = CASE id ${sql.join(pidSqlChunks, sql` `)} END,
sort_order = CASE id ${sql.join(sortOrderSqlChunks, sql` `)} END,
updated_at = CURRENT_TIMESTAMP
WHERE id IN ${itemIds}
`;
await tx.execute(finalUpdateQuery);
}
});
} catch (error) {
if (error instanceof BusinessError) {
throw error;
const { items } = body;
if (!items || items.length === 0) {
throw new BusinessError('排序项列表不能为空', 400);
}
Logger.error('字典项排序失败', { error });
throw new BusinessError('字典项排序失败,请稍后重试', 500);
}
Logger.info('字典项排序成功');
return successResponse(null, '字典项排序成功');
const itemIds = items.map((item) => BigInt(item.id));
const parentIds = [
...new Set(
items
.map((item) => item.pid)
.filter((pid) => pid !== 0 && pid !== '0')
.map((pid) => BigInt(pid as string)),
),
];
try {
await db().transaction(async (tx) => {
// 1. 使用原生SQL验证所有待排序的字典项是否存在
if (itemIds.length > 0) {
const result: any[] = await tx.execute(
sql`SELECT id FROM sys_dict WHERE id IN ${itemIds}`,
);
const existingIds = result[0].map((row: { id: bigint }) => row.id);
if (existingIds.length !== itemIds.length) {
const foundIds = new Set(existingIds);
const notFoundIds = itemIds.filter((id) => !foundIds.has(id));
Logger.warn(`部分字典项不存在:${notFoundIds.join(', ')}`);
throw new BusinessError(`部分字典项不存在: ${notFoundIds.join(', ')}`, 404);
}
}
// 2. 使用原生SQL验证所有父级字典是否存在
if (parentIds.length > 0) {
const result: any[] = await tx.execute(
sql`SELECT id FROM sys_dict WHERE id IN ${parentIds}`,
);
const existingParentIds = result[0].map((row: { id: bigint }) => row.id);
if (existingParentIds.length !== parentIds.length) {
const foundParentIds = new Set(existingParentIds);
const notFoundParentIds = parentIds.filter((id) => !foundParentIds.has(id));
Logger.warn(`部分父级字典不存在:${notFoundParentIds.join(', ')}`);
throw new BusinessError(
`部分父级字典不存在: ${notFoundParentIds.join(', ')}`,
404,
);
}
}
// 3. 使用原生SQL构建并执行批量更新
if (items.length > 0) {
const pidSqlChunks = items.map(
(item) =>
sql`WHEN ${BigInt(item.id)} THEN ${
item.pid === 0 || item.pid === '0' ? BigInt(0) : BigInt(item.pid)
}`,
);
const sortOrderSqlChunks = items.map(
(item) => sql`WHEN ${BigInt(item.id)} THEN ${item.sortOrder}`,
);
const finalUpdateQuery = sql`
UPDATE sys_dict
SET
pid = CASE id ${sql.join(pidSqlChunks, sql` `)} END,
sort_order = CASE id ${sql.join(sortOrderSqlChunks, sql` `)} END,
updated_at = CURRENT_TIMESTAMP
WHERE id IN ${itemIds}
`;
await tx.execute(finalUpdateQuery);
}
});
} catch (error) {
if (error instanceof BusinessError) {
throw error;
}
throw new BusinessError('字典项排序失败,请稍后重试', 500);
}
Logger.info('字典项排序成功');
return successResponse(null, '字典项排序成功');
} finally {
await lock.release();
}
}
/**
@ -504,74 +521,78 @@ export class DictService {
id,
cascade,
}: DeleteDictRequest): Promise<DeleteDictSuccessType> {
const dictId = BigInt(id);
Logger.info(`请求删除字典项: ${dictId}, 是否级联: ${!!cascade}`);
const lock = await DistributedLockService.acquire({ key: `dict:delete:${id}`, ttl: 10 });
try {
await db().transaction(async (tx) => {
// 1. 查找字典项
const dictResult: any[] = await tx.execute(
sql`SELECT * FROM sys_dict WHERE id = ${dictId} LIMIT 1`,
);
const dictToDelete = dictResult[0] ? dictResult[0][0] : null;
const dictId = BigInt(id);
Logger.info(`请求删除字典项: ${dictId}, 是否级联: ${!!cascade}`);
if (!dictToDelete) {
Logger.warn(`删除失败,字典项不存在: ${dictId}`);
throw new BusinessError('字典项不存在', 404);
}
// 2. 系统字典不允许删除
if (dictToDelete.is_system) {
Logger.warn(`删除失败,系统字典不允许删除: ${dictId}`);
throw new BusinessError('系统字典不允许删除', 409);
}
// 3. 查找所有子孙节点
const childrenIds: bigint[] = [];
if (cascade) {
const allDicts = await tx.query.sysDict.findMany({
columns: { id: true, pid: true },
});
const allDictsBigInt = allDicts.map((d) => ({
id: BigInt(d.id),
pid: d.pid ? BigInt(d.pid) : null,
}));
this.findAllChildrenIds(dictId, allDictsBigInt, childrenIds);
} else {
// 如果不级联,检查是否有直接子节点
const childResult: any[] = await tx.execute(
sql`SELECT id FROM sys_dict WHERE pid = ${dictId} LIMIT 1`,
try {
await db().transaction(async (tx) => {
// 1. 查找字典项
const dictResult: any[] = await tx.execute(
sql`SELECT * FROM sys_dict WHERE id = ${dictId} LIMIT 1`,
);
if (childResult[0] && childResult[0].length > 0) {
Logger.warn(`删除失败,存在子字典项,且未启用级联删除: ${dictId}`);
throw new BusinessError(
'存在子字典项,无法删除,请先删除子项或使用级联删除',
400,
const dictToDelete = dictResult[0] ? dictResult[0][0] : null;
if (!dictToDelete) {
Logger.warn(`删除失败,字典项不存在: ${dictId}`);
throw new BusinessError('字典项不存在', 404);
}
// 2. 系统字典不允许删除
if (dictToDelete.is_system) {
Logger.warn(`删除失败,系统字典不允许删除: ${dictId}`);
throw new BusinessError('系统字典不允许删除', 409);
}
// 3. 查找所有子孙节点
const childrenIds: bigint[] = [];
if (cascade) {
const allDicts = await tx.query.sysDict.findMany({
columns: { id: true, pid: true },
});
const allDictsBigInt = allDicts.map((d) => ({
id: BigInt(d.id),
pid: d.pid ? BigInt(d.pid) : null,
}));
this.findAllChildrenIds(dictId, allDictsBigInt, childrenIds);
} else {
// 如果不级联,检查是否有直接子节点
const childResult: any[] = await tx.execute(
sql`SELECT id FROM sys_dict WHERE pid = ${dictId} LIMIT 1`,
);
if (childResult[0] && childResult[0].length > 0) {
Logger.warn(`删除失败,存在子字典项,且未启用级联删除: ${dictId}`);
throw new BusinessError(
'存在子字典项,无法删除,请先删除子项或使用级联删除',
400,
);
}
}
const idsToDelete = [dictId, ...childrenIds];
Logger.info(`准备删除以下字典项ID: ${idsToDelete.join(', ')}`);
// 4. 使用原生SQL执行软删除
if (idsToDelete.length > 0) {
await tx.execute(
sql`UPDATE sys_dict SET status = 'inactive' WHERE id IN ${idsToDelete}`,
);
}
});
} catch (error) {
if (error instanceof BusinessError) {
throw error;
}
const idsToDelete = [dictId, ...childrenIds];
Logger.info(`准备删除以下字典项ID: ${idsToDelete.join(', ')}`);
// 4. 使用原生SQL执行软删除
if (idsToDelete.length > 0) {
await tx.execute(
sql`UPDATE sys_dict SET status = 'inactive' WHERE id IN ${idsToDelete}`,
);
}
});
} catch (error) {
if (error instanceof BusinessError) {
throw error;
throw new BusinessError('字典项删除失败,请稍后重试', 500);
}
Logger.error('字典项删除失败', { error });
throw new BusinessError('字典项删除失败,请稍后重试', 500);
Logger.info(`字典项 ${dictId} 及关联子项(如有)已成功软删除`);
return successResponse(null, '字典项删除成功');
} finally {
await lock.release();
}
Logger.info(`字典项 ${dictId} 及关联子项(如有)已成功软删除`);
return successResponse(null, '字典项删除成功');
}
/**