Select Git revision
Anonymisation.php
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
DatabaseService.php 17.54 KiB
<?php
namespace UnicaenDbImport\Service;
use Assert\Assertion;
use Assert\AssertionFailedException;
use Doctrine\DBAL\DBALException;
use Doctrine\ORM\Query;
use Exception;
use UnicaenApp\Exception\RuntimeException;
use UnicaenDbImport\CodeGenerator\CodeGenerator;
use UnicaenDbImport\Domain\DestinationInterface;
use UnicaenDbImport\Domain\Operation;
use UnicaenDbImport\Domain\ResultInterface;
use UnicaenDbImport\Domain\SourceInterface;
use UnicaenDbImport\Entity\Db\Service\ImportObserv\ImportObservServiceAwareTrait;
use UnicaenDbImport\QueryExecutor;
class DatabaseService
{
use ImportObservServiceAwareTrait;
/**
* @var CodeGeneratorPluginManager
*/
protected $codeGeneratorPluginManager;
/**
* @var array
*/
protected $codeGeneratorsConfig;
/**
* @var CodeGenerator
*/
protected $codeGenerator;
/**
* @var CodeGenerator[]
*/
protected $cachedCodeGenerators = [];
/**
* @var QueryExecutor
*/
protected $queryExecutor;
/**
* @var SourceInterface $source
*/
protected $source;
/**
* @var DestinationInterface $destination
*/
protected $destination;
/**
* DatabaseHelper constructor.
*
* @param CodeGeneratorPluginManager $codeGeneratorPluginManager
* @param array $codeGeneratorsConfig
* @param QueryExecutor $queryExecutor
*/
public function __construct(
CodeGeneratorPluginManager $codeGeneratorPluginManager,
array $codeGeneratorsConfig,
QueryExecutor $queryExecutor)
{
$this->codeGeneratorPluginManager = $codeGeneratorPluginManager;
$this->codeGeneratorsConfig = $codeGeneratorsConfig;
$this->queryExecutor = $queryExecutor;
}
/**
* @param SourceInterface $source
*/
public function setSource(SourceInterface $source): void
{
$this->source = $source;
}
/**
* @param DestinationInterface $destination
*/
public function setDestination(DestinationInterface $destination): void
{
$this->destination = $destination;
$this->initCodeGenerator();
}
/**
* @return CodeGenerator
*/
public function initCodeGenerator()
{
if ($this->destination === null) {
throw new RuntimeException("Une destination doit être spécifiée avant l'appel de cette méthode");
}
try {
$platformClass = get_class($this->destination->getConnection()->getDatabasePlatform());
} catch (DBALException $e) {
throw new RuntimeException(
"Impossible de déterminer la plateforme de la base de données destination", null, $e);
}
if (! isset($this->cachedCodeGenerators[$platformClass])) {
try {
Assertion::keyExists($this->codeGeneratorsConfig, $platformClass);
} catch (AssertionFailedException $e) {
throw new RuntimeException(
"Aucun CodeGenerator configuré pour la plateforme de base de données suivante : " . $platformClass, null, $e);
}
$codeGeneratorClass = $this->codeGeneratorsConfig[$platformClass];
$codeGenerator = $this->codeGeneratorPluginManager->get($codeGeneratorClass);
$this->cachedCodeGenerators[$platformClass] = $codeGenerator;
}
$this->codeGenerator = $this->cachedCodeGenerators[$platformClass];
return $this->codeGenerator;
}
/**
* Validations des éléments requis dans la base de données source.
*/
public function validateSource()
{
$this->validateSourceTableOrView();
}
/**
* Validation de la table ou vue contenant les données sources de l'import ou de la synchro.
*/
protected function validateSourceTableOrView()
{
$connection = $this->source->getConnection();
$table = $this->source->getTable();
// test if table/view exists
$sql = $this->codeGenerator->generateSQLForTableExistenceCheck($table);
$result = $this->queryExecutor->fetchAll($sql, $connection);
$exists = $this->codeGenerator->convertTableExistenceCheckResultToBoolean($result);
if (!$exists) {
throw new RuntimeException("La table source '$table' est introuvable. ");
}
}
/**
* Validations des éléments requis dans la base de données destination.
*/
public function validateDestination()
{
$this->validateDestinationTable();
$this->validateSourceTable();
}
/**
* Validation de la table destination.
*/
protected function validateDestinationTable()
{
$connection = $this->destination->getConnection();
$table = $this->destination->getTable();
$sourceCodeColumn = $this->source->getSourceCodeColumn();
// test if table exists
$sql = $this->codeGenerator->generateSQLForTableExistenceCheck($table);
$result = $this->queryExecutor->fetchAll($sql, $connection);
$exists = $this->codeGenerator->convertTableExistenceCheckResultToBoolean($result);
if (!$exists) {
throw new RuntimeException("La table destination '$table' est introuvable. ");
}
// histo columns validation
$columnCount = 0;
$sql = $this->codeGenerator->generateSQLForHistoColumnsValidationInTable($table, $columnCount);
$result = $this->queryExecutor->fetchAll($sql, $connection);
$exception = $this->codeGenerator->convertHistoColumnsValidationBadResultToException($table, $result, $columnCount);
if ($exception !== null) {
throw new RuntimeException("La table '$table' n'est pas une table de destination valide. ", 0, $exception);
}
// source columns validation
$columnCount = 0;
$sql = $this->codeGenerator->generateSQLForSourceColumnsValidationInTable($table, $sourceCodeColumn, $columnCount);
$result = $this->queryExecutor->fetchAll($sql, $connection);
$exception = $this->codeGenerator->convertSourceColumnsValidationBadResultToException($table, $result, $columnCount);
if ($exception !== null) {
throw new RuntimeException("La table '$table' n'est pas une table de destination valide. ", 0, $exception);
}
}
/**
* Validation de la table SOURCE.
*
* ATTENTION : on parle bien ici de la table nommée `SOURCE` qui doit exister dans la base de données destination,
* et non pas de la table contenant les données sources de l'import ou de la synchro.
*/
protected function validateSourceTable()
{
$connection = $this->destination->getConnection();
$table = 'source';
// test if table exists
$sql = $this->codeGenerator->generateSQLForTableExistenceCheck($table);
$result = $this->queryExecutor->fetchAll($sql, $connection);
$exists = $this->codeGenerator->convertTableExistenceCheckResultToBoolean($result);
if (!$exists) {
throw new RuntimeException("La table '$table' requise est introuvable dans la base destination. ");
}
}
/**
*
*/
public function createImportLogTableIfNotExists()
{
$connection = $this->destination->getConnection();
$importLogTable = $this->destination->getLogTable();
// test if table exists
$sql = $this->codeGenerator->generateSQLForTableExistenceCheck($importLogTable);
$result = $this->queryExecutor->fetchAll($sql, $connection);
$exists = $this->codeGenerator->convertTableExistenceCheckResultToBoolean($result);
if ($exists) {
return;
}
// create table
$sql = $this->codeGenerator->generateSQLForImportLogTableCreation($this->destination->getLogTable());
try {
$this->queryExecutor->exec($sql, $connection);
} catch (DBALException $e) {
throw new RuntimeException("Erreur rencontrée lors de la création de la table de log", null, $e);
}
}
public function checkIntermediateTableNotExists($intermediateTable)
{
$sql = $this->codeGenerator->generateSQLForTableExistenceCheck($intermediateTable);
$result = $this->queryExecutor->fetchAll($sql, $this->destination->getConnection());
$exists = $this->codeGenerator->convertTableExistenceCheckResultToBoolean($result);
if ($exists) {
throw new RuntimeException(
"Une table $intermediateTable existe déjà. Veuillez la supprimer ou alors spécifier dans la config " .
"le nom à utiliser pour générer la table intermédiaire nécessaire à l'import dans la table " .
$this->destination->getTable());
}
}
public function createIntermediateTable($intermediateTable)
{
$sql = $this->codeGenerator->generateSQLForIntermediateTableCreation($this->source, $this->destination, $intermediateTable);
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (DBALException $e) {
throw new RuntimeException("Erreur rencontrée lors de la création de la table intermédiaire", null, $e);
}
}
public function dropIntermediateTable($intermediateTable)
{
$sql = $this->codeGenerator->generateSQLForIntermmediateTableDrop($intermediateTable);
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (DBALException $e) {
throw new RuntimeException("Erreur rencontrée lors de la suppression de la table intermédiaire", null, $e);
}
}
public function truncateDestinationTable()
{
$sql = $this->codeGenerator->generateSQLForClearTable($this->destination->getTable());
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (DBALException $e) {
$tableName = $this->destination->getTable();
throw new RuntimeException("Erreur rencontrée lors du vidage de la table destination $tableName", null, $e);
}
}
/**
* @return int
*/
public function populateDestinationTableFromSource()
{
$destinationTable = $this->destination->getTable();
$idColumnSequenceName = $this->destination->getIdColumnSequence();
return $this->populateTableFromSource($destinationTable, $idColumnSequenceName);
}
/**
* @param string $tableName
* @param string|null|false $idColumnSequence
* @return int
*/
public function populateTableFromSource(string $tableName, $idColumnSequence = null)
{
$columns = $this->source->getColumns();
$sourceCodeColumn = $this->source->getSourceCodeColumn();
$columns = array_merge([$sourceCodeColumn], $columns);
$sourceCode = $this->source->getName();
$data = $this->source->getData();
$rows = [];
foreach ($data as $row) {
$preparedRow = $this->prepareSourceDataForDestination($row);
$columnsValues = [];
foreach ($columns as $column) {
$columnsValues[$column] = $preparedRow[$column] ?? null;
}
$rows[] = $columnsValues;
}
$insertsSQL = $this->codeGenerator->generateSQLForInsertIntoTable($tableName, $rows, $sourceCode, $idColumnSequence);
// insert into destination
try {
//Exécution de la requêtes (inserts) dans la base destination
$count = $this->queryExecutor->exec($insertsSQL, $this->destination->getConnection());
} catch (DBALException $e) {
throw new RuntimeException("Erreur rencontrée lors de l'écriture dans la table $tableName", null, $e);
}
return $count;
}
private function prepareSourceDataForDestination(array $row)
{
$columnNameFilter = $this->source->getColumnNameFilter();
$newKeys = array_map([$columnNameFilter, 'filter'], array_keys($row));
$row = array_combine($newKeys, $row);
return $row;
}
/**
* @param string $intermediateTable
*/
public function recreateDiffView($intermediateTable)
{
try {
$sql = $this->codeGenerator->generateSQLForDiffViewCreation($this->source, $this->destination, $intermediateTable);
} catch (DBALException $e) {
throw new RuntimeException("Erreur rencontrées lors de la création de la vue diff", null, $e);
}
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (DBALException $e) {
throw new RuntimeException("Erreur rencontrée lors de la création de la vue différentielle", null, $e);
}
}
/**
* La table IMPORT_OBSERV permet de demander la "détection" de certains changements de valeurs
* lors de la synchro. Les changements détectés sont inscrits dans la table IMPORT_OBSERV_RESULT.
*
* Cette méthode interroge la table IMPORT_OBSERV pour savoir quels changements de valeurs doivent être détectés
* et peuple la table IMPORT_OBSERV_RESULT le cas échéant.
*/
public function populateImportObservResultTable()
{
$qb = $this->importObservService->getRepository()->createQueryBuilder('io')
->andWhere("upper(io.tableName) = upper(:table)")->setParameter('table', $this->destination->getTable())
->andWhere("io.operation = 'UPDATE'")
->andWhere("io.enabled = 1")
->addOrderBy('io.operation, io.tableName, io.columnName');
$importObservRows = $qb->getQuery()->getResult(Query::HYDRATE_ARRAY);
if (!empty($importObservRows)) {
$sql = $this->codeGenerator->generateSQLForInsertionIntoImportObservResult($importObservRows);
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (DBALException $e) {
throw new RuntimeException("Erreur rencontrée lors de l'écriture dans la table IMPORT_OBSERV_RESULT", null, $e);
}
}
}
/**
* @return array
*/
public function executeDiffViewSelectRequest()
{
// interrogation de la vue V_DIFF_
$sql = $this->codeGenerator->generateSQLForDiffViewSelect($this->source, $this->destination);
return $this->queryExecutor->fetchAll($sql, $this->destination->getConnection());
}
/**
* @param array $rows
*/
public function updateDestinationFromDiffViewSelectRequestResult(array $rows)
{
// à partir du diff, génération des instructions SQL d'insert et d'update nécessaires pour mettre à jour la table destination
$sql = $this->codeGenerator->generateSQLForDestinationUpdateFromDiffViewRequestResult($rows, $this->source, $this->destination);
if ($sql) {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
}
}
/**
* @return array [operation => int|Exception]
*/
public function updateDestination()
{
$results = [];
$sql = $this->codeGenerator->generateSQLForDestinationOperation(
Operation::OPERATION_INSERT,
$this->source,
$this->destination
);
$results[Operation::OPERATION_INSERT] = $this->executeSql($sql);
$sql = $this->codeGenerator->generateSQLForDestinationOperation(
Operation::OPERATION_UPDATE,
$this->source,
$this->destination
);
$results[Operation::OPERATION_UPDATE] = $this->executeSql($sql);
$sql = $this->codeGenerator->generateSQLForDestinationOperation(
Operation::OPERATION_DELETE,
$this->source,
$this->destination
);
$results[Operation::OPERATION_DELETE] = $this->executeSql($sql);
$sql = $this->codeGenerator->generateSQLForDestinationOperation(
Operation::OPERATION_UNDELETE,
$this->source,
$this->destination
);
$results[Operation::OPERATION_UNDELETE] = $this->executeSql($sql);
return $results;
}
/**
* @param string $sql
* @return DBALException|int
*/
private function executeSql($sql)
{
try {
return $this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (DBALException $e) {
return $e;
}
}
/**
* Requête la Source pour obtenir les données sources.
*
* @return array
*/
public function fetchSource()
{
$selectSql = $this->codeGenerator->generateSQLForSelectFromSource($this->source);
return $this->queryExecutor->fetchAll($selectSql, $this->source->getConnection());
}
/**
* Requête la Source pour obtenir le 1er enregistrements des données sources.
*
* @return array
*/
public function fetchSourceFirstRow()
{
$selectSql = $this->codeGenerator->generateSQLForSelectFromSource($this->source);
return $this->queryExecutor->fetch($selectSql, $this->source->getConnection());
}
/**
* @param ResultInterface $result
*/
public function saveResultToLogTable(ResultInterface $result)
{
$importLogTable = $this->destination->getLogTable();
$sql = $this->codeGenerator->generateSQLForInsertResultIntoLogTable($result, $importLogTable);
try {
$this->queryExecutor->exec($sql, $this->destination->getConnection());
} catch (DBALException $e) {
throw new RuntimeException("Erreur rencontrée lors de l'écriture des logs dans la table $importLogTable", null, $e);
}
}
}