Select Git revision
DatabaseService.php
Bertrand Gauthier authored
[FIX] Import : seules les données issues de la source concernée doivent être préalablement purgées, pas toute la table.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
DatabaseService.php 26.06 KiB
<?php
namespace UnicaenDbImport\Service\Database;
use Assert\Assertion;
use Assert\AssertionFailedException;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\ConnectionException;
use Doctrine\DBAL\Exception;
use UnicaenDbImport\CodeGenerator\CodeGenerator;
use UnicaenDbImport\Domain\DestinationInterface;
use UnicaenDbImport\Domain\Operation;
use UnicaenDbImport\Domain\ResultInterface;
use UnicaenDbImport\Domain\SourceInterface;
use UnicaenDbImport\Entity\Db\Service\ImportObserv\ImportObservServiceAwareTrait;
use UnicaenDbImport\QueryExecutor;
use UnicaenDbImport\Service\CodeGenerator\CodeGeneratorPluginManager;
use UnicaenDbImport\Service\Exception\DatabaseServiceException;
class DatabaseService
{
use ImportObservServiceAwareTrait;
const SOURCE_TABLE_NAME = 'SOURCE';
const SOURCE_TABLE_CODE_COLUMN = 'CODE';
const DATA_SLICE_SIZE_FOR_INSERT = 1000;
/**
* @var CodeGeneratorPluginManager
*/
protected $codeGeneratorPluginManager;
/**
* @var array
*/
protected $codeGeneratorsConfig;
/**
* @var CodeGenerator
*/
protected $codeGenerator;
/**
* @var CodeGenerator[]
*/
protected $cachedCodeGenerators = [];
/**
* @var QueryExecutor
*/
protected $queryExecutor;
/**
* @var SourceInterface $source
*/
protected $source;
/**
* @var DestinationInterface $destination
*/
protected $destination;
/**
* DatabaseHelper constructor.
*
* @param CodeGeneratorPluginManager $codeGeneratorPluginManager
* @param array $codeGeneratorsConfig
* @param QueryExecutor $queryExecutor
*/
public function __construct(
CodeGeneratorPluginManager $codeGeneratorPluginManager,
array $codeGeneratorsConfig,
QueryExecutor $queryExecutor)
{
$this->codeGeneratorPluginManager = $codeGeneratorPluginManager;
$this->codeGeneratorsConfig = $codeGeneratorsConfig;
$this->queryExecutor = $queryExecutor;
}
/**
* @param SourceInterface $source
*/
public function setSource(SourceInterface $source): void
{
$this->source = $source;
}
/**
* @param DestinationInterface $destination
*/
public function setDestination(DestinationInterface $destination): void
{
$this->destination = $destination;
$this->initCodeGeneratorFromDestination();
}
/**
* Détermine, selon la plateforme de bdd destination, le générateur de code SQL à utiliser.
*
* @return CodeGenerator
*/
public function initCodeGeneratorFromDestination(): CodeGenerator
{
if ($this->destination === null) {
DatabaseServiceException::error("Une destination doit être spécifiée avant l'appel de cette méthode");
}
try {
$platformClass = get_class($this->destination->getConnection()->getDatabasePlatform());
} catch (Exception $e) {
DatabaseServiceException::error(
"Impossible de déterminer la plateforme de la base de données destination", $e);
}
if (! isset($this->cachedCodeGenerators[$platformClass])) {
try {
Assertion::keyExists($this->codeGeneratorsConfig, $platformClass);
} catch (AssertionFailedException $e) {
DatabaseServiceException::error(
"Aucun CodeGenerator configuré pour la plateforme de base de données suivante : " . $platformClass, $e);
}
$codeGeneratorClass = $this->codeGeneratorsConfig[$platformClass];
$codeGenerator = $this->codeGeneratorPluginManager->get($codeGeneratorClass);
$this->cachedCodeGenerators[$platformClass] = $codeGenerator;
}
$this->codeGenerator = $this->cachedCodeGenerators[$platformClass];
return $this->codeGenerator;
}
/**
* Validation de la table destination.
*
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function validateDestinationTable()
{
$connection = $this->destination->getConnection();
$table = $this->destination->getTable();
$sourceCodeColumn = $this->source->getSourceCodeColumn();
// test if table exists
$sql = $this->codeGenerator->generateSQLForTableExistenceCheck($table);
try {
$result = $this->queryExecutor->fetchAll($sql, $connection);
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors du test d'existence de la table destination '$table'", $e);
}
$exists = $this->codeGenerator->convertTableExistenceCheckResultToBoolean($result);
if (!$exists) {
throw DatabaseServiceException::error("La table destination '$table' est introuvable. ");
}
// id column validation
$columnCount = 0;
$sql = $this->codeGenerator->generateSQLForIdColumnValidationInTable($table, $columnCount);
try {
$result = $this->queryExecutor->fetchAll($sql, $connection);
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de la validation de la colonne id dans la table destination '$table'", $e);
}
$exception = $this->codeGenerator->convertIdColumnValidationBadResultToException($table, $result, $columnCount);
if ($exception !== null) {
throw DatabaseServiceException::error("La table '$table' n'est pas une table de destination valide. ", $exception);
}
// existence de la sequence pour la colonne id
$idColumnSequenceName = $this->destination->getIdColumnSequence();
if ($idColumnSequenceName !== false) {
$sql = $this->codeGenerator->generateSQLForSequenceExistenceCheck($table, $idColumnSequenceName);
try {
$result = $this->queryExecutor->fetchAll($sql, $connection);
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors du test d'existence de la séquence dans la table destination '$table'", $e);
}
$exception = $this->codeGenerator->convertSequenceExistenceCheckResultToException($idColumnSequenceName, $result);
if ($exception !== null) {
throw DatabaseServiceException::error("La sequence '$idColumnSequenceName' est introuvable. ", $exception);
}
}
// histo columns validation
$columnCount = 0;
$sql = $this->codeGenerator->generateSQLForHistoColumnsValidationInTable($table, $columnCount);
try {
$result = $this->queryExecutor->fetchAll($sql, $connection);
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de la validation des colonne d'historique dans la table destination '$table'", $e);
}
$exception = $this->codeGenerator->convertHistoColumnsValidationBadResultToException($table, $result, $columnCount);
if ($exception !== null) {
throw DatabaseServiceException::error("La table '$table' n'est pas une table de destination valide. ", $exception);
}
// source columns validation
$columnCount = 0;
$sql = $this->codeGenerator->generateSQLForSourceColumnsValidationInTable($table, $sourceCodeColumn, $columnCount);
try {
$result = $this->queryExecutor->fetchAll($sql, $connection);
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de la validation des colonne de source dans la table destination '$table'", $e);
}
$exception = $this->codeGenerator->convertSourceColumnsValidationBadResultToException($table, $sourceCodeColumn, $result, $columnCount);
if ($exception !== null) {
throw DatabaseServiceException::error("La table '$table' n'est pas une table de destination valide. ", $exception);
}
}
/**
* Validation de la table 'SOURCE' dans la destination.
*
* ATTENTION : on parle bien ici de la table nommée `SOURCE` qui doit exister dans la base de données destination,
* et non pas de la table contenant les données sources de l'import/synchro.
*
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function validateSourceTable()
{
$connection = $this->destination->getConnection();
$tableName = self::SOURCE_TABLE_NAME;
// test if table exists
$sql = $this->codeGenerator->generateSQLForTableExistenceCheck($tableName);
try {
$result = $this->queryExecutor->fetchAll($sql, $connection);
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors du test d'existence de la table '$tableName'", $e);
}
$exists = $this->codeGenerator->convertTableExistenceCheckResultToBoolean($result);
if (!$exists) {
throw DatabaseServiceException::error(
"La table '$tableName' nécessaire au fonctionnement est introuvable dans la base destination. ");
}
}
/**
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function validateSourceNameExists()
{
$connection = $this->destination->getConnection();
$tableName = self::SOURCE_TABLE_NAME;
$column = self::SOURCE_TABLE_CODE_COLUMN;
$value = $this->source->getCode();
if ($value === null) {
return;
}
// test if value exists in table
$sql = $this->codeGenerator->generateSQLForValueExistenceCheckInTable($column, $value, $tableName);
try {
$result = $this->queryExecutor->fetchAll($sql, $connection);
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors du test d'existence de la valeur '$value' dans la table '$tableName'", $e);
}
$exists = $this->codeGenerator->convertValueExistenceCheckInTableResultToBoolean($result);
if (!$exists) {
throw DatabaseServiceException::error("Vous devez déclarer une source dans la table '$tableName' dont le $column est '$value'.");
}
}
/**
*
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function createImportLogTableIfNotExists()
{
$connection = $this->destination->getConnection();
$importLogTable = $this->destination->getLogTable();
// test if table exists
$sql = $this->codeGenerator->generateSQLForTableExistenceCheck($importLogTable);
try {
$result = $this->queryExecutor->fetchAll($sql, $connection);
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors du test d'existence de la table de log '$importLogTable'", $e);
}
$exists = $this->codeGenerator->convertTableExistenceCheckResultToBoolean($result);
if ($exists) {
return;
}
// create table
$sql = $this->codeGenerator->generateSQLForImportLogTableCreation($this->destination->getLogTable());
try {
$this->queryExecutor->exec($sql, $connection);
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de la création de la table de log '$importLogTable'", $e);
}
}
/**
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function checkIntermediateTableNotExists($intermediateTable)
{
$sql = $this->codeGenerator->generateSQLForTableExistenceCheck($intermediateTable);
try {
$result = $this->queryExecutor->fetch($sql, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors du test d'existence de la table intermédiaire '$intermediateTable'", $e);
}
$exists = $this->codeGenerator->convertTableExistenceCheckResultToBoolean($result);
if ($exists) {
throw DatabaseServiceException::error(
"Une table $intermediateTable existe déjà. Veuillez la supprimer ou alors spécifier dans la config " .
"le nom à utiliser pour générer la table intermédiaire nécessaire à l'import dans la table " .
$this->destination->getTable());
}
}
/**
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function createIntermediateTable($intermediateTable)
{
$sql = $this->codeGenerator->generateSQLForIntermediateTableCreation($this->source, $this->destination, $intermediateTable);
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de la création de la table intermédiaire '$intermediateTable'", $e);
}
}
/**
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function dropIntermediateTable($intermediateTable)
{
$sql = $this->codeGenerator->generateSQLForIntermmediateTableDrop($intermediateTable);
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de la suppression de la table intermédiaire '$intermediateTable'", $e);
}
}
/**
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function truncateDestinationTable()
{
// détermination de l'id de la source des données à insérer, pour n'effacer que les données de cette source
// (si aucune donnée à insérer, basta)
$data = $this->source->getData();
if (empty($data)) {
return;
}
$row1 = reset($data);
$preparedRow1 = $this->prepareSourceData($row1);
$sourceId = $preparedRow1['source_id'] ?? $preparedRow1['SOURCE_ID'];
$sql = $this->codeGenerator->generateSQLForClearTable($table = $this->destination->getTable());
$sql .= " WHERE source_id = (SELECT id FROM source WHERE code = '$sourceId')";
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors du vidage de la table destination '$table'", $e);
}
}
/**
* @return int
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function populateDestinationTableFromSource(): int
{
$destinationTable = $this->destination->getTable();
$idColumnSequenceName = $this->destination->getIdColumnSequence();
$connection = $this->destination->getConnection();
$connection->beginTransaction();
$this->truncateDestinationTable();
$result = -1;
try {
$result = $this->populateTableFromSource($destinationTable, $idColumnSequenceName);
$connection->commit();
} catch (DatabaseServiceException $e) {
$this->rollBack($connection);
throw $e;
} catch (Exception $e) {
$this->rollBack($connection);
}
return $result;
}
/**
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
protected function rollBack(Connection $connection)
{
try {
$connection->rollBack();
} catch (ConnectionException $e) {
throw DatabaseServiceException::error("Erreur rencontrée lors du rollback !", $e);
}
}
/**
* @param string $tableName
* @param string|null|false $idColumnSequence
* @return int
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function populateTableFromSource(string $tableName, $idColumnSequence = null): int
{
$sourceCode = $this->source->getCode();
$data = $this->source->getData();
$count = 0;
// insert par paquets
$sliceSize = self::DATA_SLICE_SIZE_FOR_INSERT;
for ($offset = 0; $offset < count($data) ; $offset += $sliceSize) {
$slicedData = array_slice($data, $offset, $sliceSize);
foreach ($slicedData as $row) {
$preparedRow = $this->prepareSourceData($row);
$columnsValues = $this->prepareDestinationData($preparedRow);
$insertsSQL = $this->codeGenerator->generateSQLForInsertOneRowIntoTable($tableName, $columnsValues, $sourceCode, $idColumnSequence);
try {
$count += $this->queryExecutor->exec($insertsSQL, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur rencontrée lors du remplissage de la table destination '$tableName'", $e);
}
}
}
return $count;
}
/**
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
private function prepareSourceData(array $row): array
{
// transformations éventuelles de valeurs de colonnes/attributs
$columnValueFilter = $this->source->getColumnValueFilter();
if ($columnValueFilter !== null) {
$columnValueFilter->setParams($this->source->getExtra());
foreach ($row as $columnName => $columnValue) {
$row[$columnName] = $columnValueFilter->filter($columnName, $columnValue);
}
}
// transformations éventuelles de noms de colonnes/attributs
$columnNameFilter = $this->source->getColumnNameFilter();
if ($columnNameFilter !== null) {
$filteredKeys = array_map([$columnNameFilter, 'filter'], array_keys($row));
$row = array_combine($filteredKeys, $row);
}
// si la Source n'est pas fournie (par son 'code'), il faut que les données l'inclue (colonne 'SOURCE_ID')
$sourceCode = $this->source->getCode();
if ($sourceCode === null) {
if (array_key_exists('source_id', $row)) {
$row['SOURCE_ID'] = $row['source_id'];
}
elseif (!array_key_exists('SOURCE_ID', $row)) {
throw DatabaseServiceException::error(
"Lorsque la config ne fournie pas le 'code' de la source ({$this->source}), " .
"les données doivent contenir une colonne SOURCE_ID ou source_id");
}
}
return $row;
}
private function prepareDestinationData(array $row): array
{
$columns = $this->source->getColumns();
$sourceCodeColumn = $this->source->getSourceCodeColumn();
$columns = array_merge([$sourceCodeColumn], $columns);
// préparation des données :
// - noms de colonnes en majuscules ;
// - mise à NULL des colonnes pour lesquelles on n'a pas de valeur.
$columnsValues = [];
foreach (array_map('strtoupper', $columns) as $column) {
$columnsValues[$column] = $row[$column] ?? $row[strtolower($column)] ?? null;
}
// transformations éventuelles de valeurs de colonnes
$columnValueFilter = $this->destination->getColumnValueFilter();
if ($columnValueFilter !== null) {
$columnValueFilter->setParams($this->source->getExtra());
foreach ($columnsValues as $columnName => $columnValue) {
$columnsValues[$columnName] = $columnValueFilter->filter($columnName, $columnValue);
}
}
return $columnsValues;
}
/**
* @param string|null $intermediateTable
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function recreateDiffView(?string $intermediateTable = null)
{
try {
$sql = $this->codeGenerator->generateSQLForDiffViewCreation($this->source, $this->destination, $intermediateTable);
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de la génération du SQL de création de la vue différentielle", $e);
}
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de la création de la vue différentielle", $e);
}
}
/**
* @return array
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function fetchDiffView(): array
{
$sql = $this->codeGenerator->generateSQLForDiffViewSelect($this->source, $this->destination);
try {
return $this->queryExecutor->fetchAll($sql, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de l'interrogation de la vue différentielle ", $e);
}
}
/**
* @return int
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function fetchDiffViewCount(): int
{
$sql = $this->codeGenerator->generateSQLForDiffViewSelect($this->source, $this->destination);
$sql = 'select count(*) as nb from (' . $sql . ') tmp';
try {
$result = $this->queryExecutor->fetch($sql, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de l'interrogation de la vue différentielle ", $e);
}
return (int) $result['nb'];
}
/**
* La table IMPORT_OBSERV permet de demander la "détection" de certains changements de valeurs
* lors de la synchro. Les changements détectés sont inscrits dans la table IMPORT_OBSERV_RESULT.
*
* Cette méthode interroge la table IMPORT_OBSERV pour savoir quels changements de valeurs doivent être détectés
* et peuple la table IMPORT_OBSERV_RESULT le cas échéant.
*
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function populateImportObservResultTable()
{
// $qb = $this->importObservService->getRepository()->createQueryBuilder('io')
// ->andWhere("upper(io.tableName) = upper(:table)")->setParameter('table', $this->destination->getTable())
// ->andWhere("io.operation = 'UPDATE'")
// ->andWhere("io.enabled = 1")
// ->addOrderBy('io.operation, io.tableName, io.columnName');
// $importObservRows = $qb->getQuery()->getResult(Query::HYDRATE_ARRAY);
$sql = $this->codeGenerator->generateSQLForSelectingInImportObservTable($this->destination->getTable());
try {
$importObservRows = $this->queryExecutor->fetchAll($sql, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors du fetch des ImportObserv", $e);
}
if (!empty($importObservRows)) {
$sql = $this->codeGenerator->generateSQLForInsertionIntoImportObservResult($this->destination, $importObservRows);
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de la création des ImportObservResult", $e);
}
}
}
/**
* @return array [operation => int|Exception]
*
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function updateDestination(): array
{
$connection = $this->destination->getConnection();
$results = [];
$operations = [
Operation::OPERATION_INSERT,
Operation::OPERATION_UPDATE,
Operation::OPERATION_DELETE,
Operation::OPERATION_UNDELETE,
];
foreach ($operations as $operation) {
$sql = $this->codeGenerator->generateSQLForDestinationUpdate($operation, $this->source, $this->destination);
$connection->beginTransaction();
try {
$result = $this->queryExecutor->exec($sql, $connection);
$connection->commit();
} catch (Exception $e) {
$this->rollBack($connection);
$result = DatabaseServiceException::error("Erreur lors de l'opération '$operation'", $e);
}
$results[$operation] = $result;
}
return $results;
}
/**
* Requête la Source pour obtenir les données sources.
*
* @return array
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function fetchSource(): array
{
$selectSql = $this->codeGenerator->generateSQLForSelectFromSource($this->source);
try {
return $this->queryExecutor->fetchAll($selectSql, $this->source->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors de l'interrogation de la table/select", $e);
}
}
/**
* Requête la Source pour obtenir le 1er enregistrement des données sources.
*
* @return array|null
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function fetchSourceFirstRow(): ?array
{
$selectSql = $this->codeGenerator->generateSQLForSelectFromSource($this->source);
try {
return $this->queryExecutor->fetch($selectSql, $this->source->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Erreur lors du fetch", $e);
}
}
/**
* @param ResultInterface $result
* @throws \UnicaenDbImport\Service\Exception\DatabaseServiceException
*/
public function saveResultToLogTable(ResultInterface $result)
{
$importLogTable = $this->destination->getLogTable();
$sql = $this->codeGenerator->generateSQLForInsertResultIntoLogTable($result, $importLogTable);
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (Exception $e) {
throw DatabaseServiceException::error("Insert impossible dans la table de log '$importLogTable'", $e);
}
}
}