ETL Service
Назначение
etl принимает сырые данные из внешних систем, валидирует и нормализует их, преобразует в canonical-команды доменной системы, диспатчит их в downstream сервисы через RabbitMQ RPC и сохраняет журнал импорта. Это интеграционный сервис между внешними системами (ZUP, ERP, MES, SCADA, LIMS) и внутренними сервисами production и personnel.
Как сервис встроен в систему
- Поднимает HTTP API с глобальным префиксом
api/v1. - Основной контроллер расположен по маршруту
/etl. - Для импорта и просмотра истории использует JWT/authz (
@Auth(UserRole.ADMIN)). - Для доставки в downstream сервисы использует RabbitMQ request/reply.
- Собственные события об импорте публикует в
efko.etl.events.
Основные модули
IngestionModule: HTTP-вход, auth, приём JSON и файлов, запуск pipeline.TransformModule: реестр transformer-ов по source system.ImportsModule: журнал импортов и логов трансформации в Mongo.DispatchModule: RabbitMQ dispatch с retry/backoff.RabbitModule: Rabbit transport иEventEmitterService.MongoModule/ Mongoose infrastructure: схемыRawImport,TransformationLog, GridFS-хранилище исходных файлов.
HTTP API
Base URL: http://localhost:4200/api/v1
Аутентификация: Bearer accessToken (для всех эндпоинтов)
Роль: ADMIN (требуется для всех эндпоинтов)
Глобально включены ValidationPipe, LoggingInterceptor, HttpExceptionFilter и RequestIdMiddleware.
POST /api/v1/etl/import
Импорт массива записей в JSON.
Authentication: Bearer accessToken
Role: ADMIN
CSRF: Требуется для браузера
Request Body:
{
source_system: SourceSystem; // '1c_zup' | '1c_erp' | 'mes' | 'scada' | 'lims'
import_type: ImportType; // 'employees' | 'departments' | 'positions' | 'products' | 'orders' | 'sensors' | 'quality'
data: Array<Record<string, any>>; // Массив записей для импорта
}
Response:
{
import_id: string; // MongoDB ObjectId импорта
status: ImportStatus; // 'pending' | 'processing' | 'completed' | 'failed'
records_count: number;
source_file_id?: string;
warnings?: string[];
parse_errors?: Array<{ index: number; field: string; message: string }>;
}
Пример запроса:
curl -X POST http://localhost:4200/api/v1/etl/import \
-H "Authorization: Bearer <accessToken>" \
-H "Content-Type: application/json" \
-d '{
"source_system": "1c_zup",
"import_type": "employees",
"data": [
{
"ТабельныйНомер": "EMP-0001",
"ФИО": "Иванов Иван Иванович",
"ДатаРождения": "1985-05-15"
}
]
}'
Ошибки:
- 400 — Некорректный формат или отсутствует обязательное поле
- 401 — Не авторизован
- 403 — Недостаточно прав
POST /api/v1/etl/import/file
Импорт файла (xlsx или json) через multipart upload.
Authentication: Bearer accessToken
Role: ADMIN
CSRF: Требуется для браузера
Max file size: 20 MB
Request Body (multipart/form-data):
file: <binary file> // xlsx или json файл
source_system: string // ZUP | ERP | MES | SCADA | LIMS
import_type: string // тип импорта
Response:
{
import_id: string; // MongoDB ObjectId импорта
status: 'processing'; // Файл принят и обрабатывается
records_count: number;
source_file_id?: string; // ID файла в GridFS
format?: 'xlsx' | 'json';
warnings?: string[];
parse_errors?: Array<{ index: number; field: string; message: string }>;
}
Пример запроса:
curl -X POST http://localhost:4200/api/v1/etl/import/file \
-H "Authorization: Bearer <accessToken>" \
-F "file=@employees.xlsx" \
-F "source_system=ZUP" \
-F "import_type=employees"
Ошибки:
- 400 — Неподдерживаемый формат файла или превышен размер
- 401 — Не авторизован
- 403 — Недостаточно прав
GET /api/v1/etl/imports
Получить список импортов.
Authentication: Bearer accessToken
Role: ADMIN
CSRF: Не требуется
Query Parameters:
{
source_system?: SourceSystem; // Фильтр: '1c_zup' | '1c_erp' | 'mes' | 'scada' | 'lims'
status?: ImportStatus; // Фильтр: 'pending' | 'processing' | 'completed' | 'failed'
limit?: number; // Максимальное количество записей (1–500, по умолчанию 20)
}
Response:
Array<{
import_id: string;
source_system: SourceSystem;
import_type: ImportType;
status: ImportStatus;
records_count: number;
source_file_id?: string;
source_file_format?: 'xlsx' | 'json';
created_at: string;
processed_at: string | null;
}>
Пример запроса:
curl -X GET "http://localhost:4200/api/v1/etl/imports?source_system=ZUP&limit=10" \
-H "Authorization: Bearer <accessToken>"
GET /api/v1/etl/imports/:id
Получить детали импорта.
Authentication: Bearer accessToken
Role: ADMIN
CSRF: Не требуется
Response:
{
import_id: string;
source_system: SourceSystem;
import_type: ImportType;
status: ImportStatus;
records_count: number;
source_file_id?: string;
source_file_format?: 'xlsx' | 'json';
created_at: string;
processed_at: string | null;
stats: {
total: number;
success: number;
error: number;
skipped: number;
};
errors: Array<{ field: string; message: string; record_index?: number }>;
}
Пример запроса:
curl -X GET http://localhost:4200/api/v1/etl/imports/507f1f77bcf86cd799439011 \
-H "Authorization: Bearer <accessToken>"
Ошибки:
- 404 — Импорт не найден
GET /api/v1/etl/imports/:id/file
Скачать исходный файл импорта.
Authentication: Bearer accessToken
Role: ADMIN
CSRF: Не требуется
Response:
Бинарный поток файла (application/octet-stream или исходный MIME-type)
Пример запроса:
curl -X GET http://localhost:4200/api/v1/etl/imports/507f1f77bcf86cd799439011/file \
-H "Authorization: Bearer <accessToken>" \
-o employees.xlsx
Ошибки:
- 404 — Импорт или файл не найден
POST /api/v1/etl/imports/:id/retry
Повторить импорт (только для импортов со статусом failed).
Authentication: Bearer accessToken
Role: ADMIN
CSRF: Требуется для браузера
Request Body:
Response:
Пример запроса:
curl -X POST http://localhost:4200/api/v1/etl/imports/507f1f77bcf86cd799439011/retry \
-H "Authorization: Bearer <accessToken>" \
-H "Content-Type: application/json" \
-d '{}'
Ошибки:
- 400 — Импорт не может быть повторен (не в статусе failed)
- 404 — Импорт не найден
Ingestion
IngestionControllerизвлекает request metadata из HTTP headers и передаёт их дальше в pipeline.IngestionService.processImport(...):- проверяет корректность
sourceSystem; - создаёт новую запись импорта или переиспользует существующую при retry;
- поднимает статус
processing; - получает import schema по паре
source_system+import_type; - нормализует и валидирует входные записи через
validateRecords. - Поддерживаемые source system и import types по коду:
1c_zup:employees,departments,positions1c_erp:productsmes:ordersscada:sensorslims:quality- Схемы поддерживают alias-ы входных полей, включая 1C-ключи на русском, и coercion типов (
string,number,date-string,boolean).
Transform
TransformerRegistryвыбирает transformer поSourceSystem.- Реализованы transformer-ы:
ZupTransformerErpTransformerMesTransformerScadaTransformerLimsTransformer- Трансформация идёт в canonical records вида:
entityTypesourceIdcanonicalIdpayloadexchangeroutingKey- Важные маршруты downstream:
ZUP-> командыpersonnelERP,MES,SCADA,LIMS-> командыproduction- Mapper-ы в коде переводят внешние enum-ы в внутренние доменные enum-ы и выставляют routing key из
@efko-kernel/contracts. - Особый случай
SCADA: alarms распознаются на уровне transformer-а, но в комментарии явно указано, что alarms только логируются и не диспатчатся как сущности.
Imports
ImportsServiceхранит запись импорта в коллекцииRawImport.RawImportсодержит:source_systemimport_typeraw_payloadstatusrecords_count- массив
errors processed_at- ссылку на исходный файл и его формат, если импорт шёл из файла.
- Каждый dispatch логируется в
TransformationLog: import_identity_typesource_idcanonical_idtransformation_result(success/error/skipped)error_message- Для retry старые transformation logs удаляются, а импорт переводится обратно в
processing.
Dispatch
DispatchServiceиспользуетamqpConnection.request(...).- Доставка идёт с exponential backoff:
maxRetries: 3baseDelayMs: 1000maxDelayMs: 30000backoffMultiplier: 2- В headers передаётся
correlationId, если он есть в request metadata. - Если dispatch конкретной canonical-записи падает, сервис:
- фиксирует ошибку в
TransformationLog; - добавляет запись в
errorsимпорта; - продолжает обработку остальных записей.
- Итоговый статус импорта:
failed, если transform не дал ни одной canonical-записи;failed, если все dispatch-операции упали;completed, если есть хотя бы частичный успех.
Хранение данных
MongoDB / Mongoose:
RawImport: журнал сырых импортов и их статусов.TransformationLog: построчный лог трансформации и dispatch результата.- GridFS bucket
etl_source_files: хранение исходных файлов импорта.
GridFS metadata включает:
sourceSystemimportTypeformatuploadedBymimeuploadedAt
Интеграции
- Входящие системы (значения
SourceSystemenum): 1c_zup(1С:ЗУП)1c_erp(1С:ERP)mes(MES)scada(SCADA)lims(LIMS)- Исходящие сервисы:
personnelчерезefko.personnel.commandsproductionчерезefko.production.commands- Собственные события ETL:
EtlImportCompletedEventEtlImportFailedEvent- публикуются в
efko.etl.events
Примеры бизнес-маршрутизации
1c_zup employees->PersonnelCreateEmployeeCommand1c_zup departments->PersonnelCreateDepartmentCommand1c_zup positions->PersonnelCreatePositionCommand1c_zup shift templatesподдерживаются mapper-ом, но отдельная schema для такогоimport_typeвIMPORT_SCHEMASв текущем файле не описана1c_erp products->ProductionCreateProductCommand1c_erp salesиinventoryподдерживаются mapper-ом, но вIMPORT_SCHEMASдля них сейчас нет отдельных схемmes orders->ProductionCreateOrderCommandmes outputподдерживается mapper-ом, но отдельная schema/import type вIMPORT_SCHEMASсейчас не описаныscada sensors->ProductionRecordSensorReadingCommandlims quality->ProductionRecordQualityResultCommand
Обработка ошибок
- HTTP ошибки идут через глобальный
HttpExceptionFilter. - Пустой upload-файл, отсутствие import schema и невалидный retry-status дают
BadRequestException/NotFoundException. - Ошибки transform и dispatch не обязательно валят весь импорт немедленно: сервис стремится обработать максимум записей и сохраняет частичные ошибки в журнал.
- Если файл уже сохранён в GridFS, но импорт дальше не создался,
FileIngestionServiceвыполняет compensating delete.
Observability и logging
- Логирование через
nestjs-pino, dev-лог вlogs/etl.log. RequestIdMiddlewareнавешивается на все маршруты.LoggingInterceptorпишет HTTP request metadata и duration.- Сервис насыщенно использует
buildLogContext(...)для связи логов поcorrelationId/request metadata. - Логируются:
- старт импорта;
- результат трансформации;
- ошибки dispatch конкретных записей;
- финальный статус импорта;
- сохранение/получение/удаление файлов в GridFS.
Зависимости
- NestJS
- Mongoose + MongoDB + GridFS
- RabbitMQ
- JWT auth из
@efko-kernel/nest-utils xlsxдля Excel parsing@efko-kernel/contracts@efko-kernel/interfaces
Наблюдения и пробелы по коду
IMPORT_SCHEMASи mapper-ы покрывают разные множества import types: mapper-ы уже умеют больше, чем разрешает текущая схема в ingestion.- В документации по supported imports выше отражено именно то, что реально следует из
IMPORT_SCHEMASи mapper-ов на текущий момент. - ETL не хранит доменные сущности downstream; его персистентность ограничена журналом импортов, логом трансформации и исходными файлами.