PostgreSQL.pm 20.4 KB
Newer Older
1
2
3
4
# -*- indent-tabs-mode: nil; -*-
# vim:ft=perl:et:sw=4
# $Id$

5
# Sympa - SYsteme de Multi-Postage Automatique
6
7
8
9
#
# Copyright (c) 1997, 1998, 1999 Institut Pasteur & Christophe Wolfhugel
# Copyright (c) 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005,
# 2006, 2007, 2008, 2009, 2010, 2011 Comite Reseau des Universites
10
# Copyright (c) 2011, 2012, 2013, 2014, 2015, 2016, 2017 GIP RENATER
11
12
13
# Copyright 2018 The Sympa Community. See the AUTHORS.md file at the
# top-level directory of this distribution and at
# <https://github.com/sympa-community/sympa.git>.
14
15
16
17
18
19
20
21
22
23
24
25
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
26
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
27

28
package Sympa::DatabaseDriver::PostgreSQL;
29
30

use strict;
31
use warnings;
32
use Encode qw();
33

34
use Sympa::Log;
35

36
use base qw(Sympa::DatabaseDriver);
37

38
39
my $log = Sympa::Log->instance;

40
41
use constant required_modules => [qw(DBD::Pg)];

42
sub build_connect_string {
43
    my $self = shift;
44
45

    my $connect_string =
46
        "DBI:Pg:dbname=$self->{'db_name'};host=$self->{'db_host'}";
47
48
49
50
51
52
53
54
55
56
57
58
59
60
    $connect_string .= ';port=' . $self->{'db_port'}
        if defined $self->{'db_port'};
    $connect_string .= ';' . $self->{'db_options'}
        if defined $self->{'db_options'};
    return $connect_string;
}

sub connect {
    my $self = shift;

    $self->SUPER::connect() or return undef;

    # - Configure Postgres to use ISO format dates.
    # - Set client encoding to UTF8.
61
62
    # Note: utf8 flagging must be disabled so that we will consistently use
    # UTF-8 bytestring as internal format.
63
64
65
66
    $self->__dbh->do("SET DATESTYLE TO 'ISO';");
    $self->__dbh->do("SET NAMES 'utf8'");

    return 1;
67
}
68

69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
sub quote {
    my $self      = shift;
    my $string    = shift;
    my $data_type = shift;

    # Set utf8 flag, because DBD::Pg 3.x needs utf8 flag for input parameters
    # even if pg_enable_utf8 option is disabled.
    unless (0 == index($DBD::Pg::VERSION, '2')
        or (ref $data_type eq 'HASH' and $data_type->{pg_type})) {
        $string = Encode::decode_utf8($string);
    }
    return $self->SUPER::quote($string, $data_type);
}

sub do_prepared_query {
    my $self  = shift;
    my $query = shift;

    # Set utf8 flag, because DBD::Pg 3.x needs utf8 flag for input parameters
    # even if pg_enable_utf8 option is disabled.
    unless (0 == index($DBD::Pg::VERSION, '2')) {
        my @params;
        while (scalar @_) {
            my $p = shift;
            if (ref $p) {
                push @params, $p, shift;
            } else {
                push @params, Encode::decode_utf8($p);
            }
        }
        @_ = @params;
    }
    return $self->SUPER::do_prepared_query($query, @_);
}

104
sub get_substring_clause {
105
    my $self  = shift;
106
    my $param = shift;
107
    $log->syslog('debug2', 'Building a substring clause');
108
109
110
111
112
113
114
115
    return
          "SUBSTRING("
        . $param->{'source_field'}
        . " FROM position('"
        . $param->{'separator'} . "' IN "
        . $param->{'source_field'}
        . ") FOR "
        . $param->{'substring_length'} . ")";
116
117
}

118
119
# DEPRECATED.
#sub get_limit_clause ( { rows_count => $rows, offset => $offset } );
120

121
122
# DEPRECATED.
#sub get_formatted_date;
123

124
sub is_autoinc {
125
    my $self  = shift;
126
    my $param = shift;
127
    $log->syslog('debug', 'Checking whether field %s.%s is an autoincrement',
128
129
        $param->{'table'}, $param->{'field'});
    my $seqname = $param->{'table'} . '_' . $param->{'field'} . '_seq';
130
    my $sth;
131
    unless (
132
133
134
135
136
137
138
139
140
141
        $sth = $self->do_prepared_query(
            q{SELECT relname
              FROM pg_class
              WHERE relname = ? AND relkind = 'S' AND
                    relnamespace IN (
                                     SELECT oid
                                     FROM pg_namespace
                                     WHERE nspname NOT LIKE 'pg_%' AND
                                           nspname != 'information_schema'
                                    )},
142
143
144
            $seqname
        )
        ) {
145
        $log->syslog('err',
146
147
148
149
150
            'Unable to gather autoincrement field named %s for table %s',
            $param->{'field'}, $param->{'table'});
        return undef;
    }
    my $field = $sth->fetchrow();
151
    return ($field eq $seqname);
152
153
154
}

sub set_autoinc {
155
    my $self  = shift;
156
    my $param = shift;
157
    $log->syslog('debug', 'Setting field %s.%s as an auto increment',
158
159
160
        $param->{'table'}, $param->{'field'});
    my $seqname = $param->{'table'} . '_' . $param->{'field'} . '_seq';
    unless ($self->do_query("CREATE SEQUENCE %s", $seqname)) {
161
        $log->syslog('err', 'Unable to create sequence %s', $seqname);
162
        return undef;
163
    }
164
165
166
167
168
169
    unless (
        $self->do_query(
            "ALTER TABLE %s ALTER COLUMN %s TYPE BIGINT", $param->{'table'},
            $param->{'field'}
        )
        ) {
170
        $log->syslog('err',
171
172
173
            'Unable to set type of field %s in table %s as bigint',
            $param->{'field'}, $param->{'table'});
        return undef;
174
    }
175
176
177
178
179
180
    unless (
        $self->do_query(
            "ALTER TABLE %s ALTER COLUMN %s SET DEFAULT NEXTVAL('%s')",
            $param->{'table'}, $param->{'field'}, $seqname
        )
        ) {
181
        $log->syslog(
182
            'err',
183
            'Unable to set default value of field %s in table %s as next value of sequence table %s',
184
185
186
187
188
            $param->{'field'},
            $param->{'table'},
            $seqname
        );
        return undef;
189
    }
190
191
192
193
194
195
    unless (
        $self->do_query(
            "UPDATE %s SET %s = NEXTVAL('%s')", $param->{'table'},
            $param->{'field'},                  $seqname
        )
        ) {
196
        $log->syslog('err',
197
198
199
            'Unable to set sequence %s as value for field %s, table %s',
            $seqname, $param->{'field'}, $param->{'table'});
        return undef;
200
201
    }
    return 1;
202
}
203

204
# Note: Pg searches tables in schemas listed in search_path, defaults to be
205
# '"$user",public'.
206
207
sub get_tables {
    my $self = shift;
208
    $log->syslog('debug3', 'Getting the list of tables in database %s',
209
        $self->{'db_name'});
210
211
212
213
214

    ## get search_path.
    ## The result is an arrayref; needs DBD::Pg >= 2.00 and PostgreSQL > 7.4.
    my $sth;
    unless ($sth = $self->do_query('SELECT current_schemas(false)')) {
215
        $log->syslog('err', 'Unable to get search_path of database %s',
216
217
            $self->{'db_name'});
        return undef;
218
219
220
221
222
    }
    my $search_path = $sth->fetchrow;
    $sth->finish;

    ## get table names.
223
    my @raw_tables;
224
225
    my %raw_tables;
    foreach my $schema (@{$search_path || []}) {
226
        my @tables =
227
228
            $self->__dbh->tables(undef, $schema, undef, 'TABLE',
            {pg_noprefix => 1});
229
230
231
232
233
        foreach my $t (@tables) {
            next if $raw_tables{$t};
            push @raw_tables, $t;
            $raw_tables{$t} = 1;
        }
234
235
    }
    unless (@raw_tables) {
236
        $log->syslog('err',
237
238
239
            'Unable to retrieve the list of tables from database %s',
            $self->{'db_name'});
        return undef;
240
241
    }
    return \@raw_tables;
242
243
244
}

sub add_table {
245
    my $self  = shift;
246
    my $param = shift;
247
    $log->syslog('debug', 'Adding table %s', $param->{'table'});
248
249
250
    unless (
        $self->do_query("CREATE TABLE %s (temporary INT)", $param->{'table'}))
    {
251
        $log->syslog('err', 'Could not create table %s in database %s',
252
253
            $param->{'table'}, $self->{'db_name'});
        return undef;
254
    }
255
256
    return sprintf "Table %s created in database %s", $param->{'table'},
        $self->{'db_name'};
257
258
259
}

sub get_fields {
260
    my $self  = shift;
261
    my $param = shift;
262
    $log->syslog('debug',
263
264
        'Getting the list of fields in table %s, database %s',
        $param->{'table'}, $self->{'db_name'});
265
266
    my $sth;
    my %result;
267
268
269
270
271
272
    unless (
        $sth = $self->do_query(
            "SELECT a.attname AS field, t.typname AS type, a.atttypmod AS length FROM pg_class c, pg_attribute a, pg_type t WHERE a.attnum > 0 and a.attrelid = c.oid and c.relname = '%s' and a.atttypid = t.oid order by a.attnum",
            $param->{'table'}
        )
        ) {
273
        $log->syslog('err',
274
275
276
277
278
279
280
281
282
283
284
285
286
            'Could not get the list of fields from table %s in database %s',
            $param->{'table'}, $self->{'db_name'});
        return undef;
    }
    while (my $ref = $sth->fetchrow_hashref('NAME_lc')) {
        # What a dirty method ! We give a Sympa tee shirt to anyone that
        # suggest a clean solution ;-)
        my $length = $ref->{'length'} - 4;
        if ($ref->{'type'} eq 'varchar') {
            $result{$ref->{'field'}} = $ref->{'type'} . '(' . $length . ')';
        } else {
            $result{$ref->{'field'}} = $ref->{'type'};
        }
287
288
    }
    return \%result;
289
290
291
}

sub update_field {
292
    my $self  = shift;
293
    my $param = shift;
294
295
    my $table = $param->{'table'};
    my $field = $param->{'field'};
296
    my $type  = $param->{'type'};
297
    $log->syslog('debug3', 'Updating field %s in table %s (%s, %s)',
298
        $field, $table, $type, $param->{'notnull'});
299
    my $options = '';
300
    if ($param->{'notnull'}) {
301
        $options .= ' NOT NULL ';
302
    }
303
304
305
306
307
308
309
    my $report;
    my @sql;

    ## Conversion between timestamp and integer is not obvious.
    ## So create new column then copy contents.
    my $fields = $self->get_fields({'table' => $table});
    if ($fields->{$field} eq 'timestamptz' and $type =~ /^int/i) {
310
        @sql = (
sikeda's avatar
sikeda committed
311
312
313
314
            "ALTER TABLE $table RENAME $field TO ${field}_tmp",
            "ALTER TABLE $table ADD $field $type$options",
            "UPDATE $table SET $field = date_part('epoch', ${field}_tmp)",
            "ALTER TABLE $table DROP ${field}_tmp"
315
        );
316
    } else {
317
318
        @sql = sprintf("ALTER TABLE %s ALTER COLUMN %s TYPE %s %s",
            $table, $field, $type, $options);
319
320
    }
    foreach my $sql (@sql) {
321
        $log->syslog('notice', '%s', $sql);
322
323
324
325
326
327
        if ($report) {
            $report .= "\n$sql";
        } else {
            $report = $sql;
        }
        unless ($self->do_query('%s', $sql)) {
328
            $log->syslog('err', 'Could not change field "%s" in table "%s"',
329
330
331
332
333
334
                $param->{'field'}, $param->{'table'});
            return undef;
        }
    }
    $report .=
        sprintf("\nField %s in table %s, structure updated", $field, $table);
335
    $log->syslog('info', 'Field %s in table %s, structure updated',
336
        $field, $table);
337
    return $report;
338
339
340
}

sub add_field {
341
    my $self  = shift;
342
    my $param = shift;
343
    $log->syslog(
344
345
346
347
348
        'debug',             'Adding field %s in table %s (%s, %s, %s, %s)',
        $param->{'field'},   $param->{'table'},
        $param->{'type'},    $param->{'notnull'},
        $param->{'autoinc'}, $param->{'primary'}
    );
349
    my $options = '';
350
351
    # To prevent "Cannot add a NOT NULL column with default value NULL" errors
    if ($param->{'notnull'}) {
352
        $options .= 'NOT NULL ';
353
    }
354
355
    if ($param->{'primary'}) {
        $options .= ' PRIMARY KEY ';
356
    }
357
358
359
360
361
362
363
    unless (
        $self->do_query(
            "ALTER TABLE %s ADD %s %s %s", $param->{'table'},
            $param->{'field'},             $param->{'type'},
            $options
        )
        ) {
364
        $log->syslog('err',
365
366
367
            'Could not add field %s to table %s in database %s',
            $param->{'field'}, $param->{'table'}, $self->{'db_name'});
        return undef;
368
369
    }

370
371
    my $report = sprintf('Field %s added to table %s (options : %s)',
        $param->{'field'}, $param->{'table'}, $options);
372
    $log->syslog('info', 'Field %s added to table %s (options: %s)',
373
374
        $param->{'field'}, $param->{'table'}, $options);

375
    return $report;
376
377
}

378
sub delete_field {
379
    my $self  = shift;
380
    my $param = shift;
381
    $log->syslog('debug', 'Deleting field %s from table %s',
382
383
384
385
386
387
388
389
        $param->{'field'}, $param->{'table'});

    unless (
        $self->do_query(
            "ALTER TABLE %s DROP COLUMN %s", $param->{'table'},
            $param->{'field'}
        )
        ) {
390
        $log->syslog('err',
391
392
393
            'Could not delete field %s from table %s in database %s',
            $param->{'field'}, $param->{'table'}, $self->{'db_name'});
        return undef;
394
395
    }

396
397
    my $report = sprintf('Field %s removed from table %s',
        $param->{'field'}, $param->{'table'});
398
    $log->syslog('info', 'Field %s removed from table %s',
399
400
        $param->{'field'}, $param->{'table'});

401
402
403
    return $report;
}

404
sub get_primary_key {
405
    my $self  = shift;
406
407
    my $param = shift;

408
    $log->syslog('debug', 'Getting primary key for table %s',
409
        $param->{'table'});
410
411
    my %found_keys;
    my $sth;
412
413
414
415
416
417
    unless (
        $sth = $self->do_query(
            "SELECT pg_attribute.attname AS field FROM pg_index, pg_class, pg_attribute WHERE pg_class.oid ='%s'::regclass AND indrelid = pg_class.oid AND pg_attribute.attrelid = pg_class.oid AND pg_attribute.attnum = any(pg_index.indkey) AND indisprimary",
            $param->{'table'}
        )
        ) {
418
        $log->syslog('err',
419
420
421
            'Could not get the primary key from table %s in database %s',
            $param->{'table'}, $self->{'db_name'});
        return undef;
422
423
424
    }

    while (my $ref = $sth->fetchrow_hashref('NAME_lc')) {
425
426
        $found_keys{$ref->{'field'}} = 1;
    }
427
428
429
    return \%found_keys;
}

430
sub unset_primary_key {
431
    my $self  = shift;
432
    my $param = shift;
433
    $log->syslog('debug', 'Removing primary key from table %s',
434
        $param->{'table'});
435
436

    my $sth;
437
438
439
440
441

    ## PostgreSQL does not have 'ALTER TABLE ... DROP PRIMARY KEY'.
    ## Instead, get a name of constraint then drop it.
    my $key_name;

442
443
444
    unless (
        $sth = $self->do_query(
            q{SELECT tc.constraint_name
445
446
447
              FROM information_schema.table_constraints AS tc
              WHERE tc.table_catalog = %s AND tc.table_name = %s AND
                    tc.constraint_type = 'PRIMARY KEY'},
sikeda's avatar
sikeda committed
448
            $self->quote($self->{'db_name'}), $self->quote($param->{'table'})
449
450
        )
        ) {
451
        $log->syslog('err',
452
453
454
            'Could not search primary key from table %s in database %s',
            $param->{'table'}, $self->{'db_name'});
        return undef;
455
456
457
458
459
    }

    $key_name = $sth->fetchrow_array();
    $sth->finish;
    unless (defined $key_name) {
460
        $log->syslog('err',
461
462
463
            'Could not get primary key from table %s in database %s',
            $param->{'table'}, $self->{'db_name'});
        return undef;
464
465
    }

466
467
468
469
470
471
    unless (
        $sth = $self->do_query(
            q{ALTER TABLE %s DROP CONSTRAINT "%s"}, $param->{'table'},
            $key_name
        )
        ) {
472
        $log->syslog('err',
473
474
475
            'Could not drop primary key "%s" from table %s in database %s',
            $key_name, $param->{'table'}, $self->{'db_name'});
        return undef;
476
    }
477

478
    my $report = "Table $param->{'table'}, PRIMARY KEY dropped";
479
    $log->syslog('info', 'Table %s, PRIMARY KEY dropped', $param->{'table'});
480
481
482
483
484

    return $report;
}

sub set_primary_key {
485
    my $self  = shift;
486
487
488
    my $param = shift;

    my $sth;
489
490
491
492

    ## Give fixed key name if possible.
    my $key;
    if ($param->{'table'} =~ /^(.+)_table$/) {
493
        $key = sprintf 'CONSTRAINT "ind_%s" PRIMARY KEY', $1;
494
    } else {
495
        $key = 'PRIMARY KEY';
496
497
    }

498
    my $fields = join ',', @{$param->{'fields'}};
499
    $log->syslog('debug', 'Setting primary key for table %s (%s)',
500
501
502
503
504
505
506
        $param->{'table'}, $fields);
    unless (
        $sth = $self->do_query(
            q{ALTER TABLE %s ADD %s (%s)}, $param->{'table'},
            $key,                          $fields
        )
        ) {
507
        $log->syslog(
508
509
510
511
512
513
514
            'err',
            'Could not set fields %s as primary key for table %s in database %s',
            $fields,
            $param->{'table'},
            $self->{'db_name'}
        );
        return undef;
515
    }
516

517
    my $report = "Table $param->{'table'}, PRIMARY KEY set on $fields";
518
    $log->syslog('info', 'Table %s, PRIMARY KEY set on %s',
519
        $param->{'table'}, $fields);
520
521
522
    return $report;
}

523
sub get_indexes {
524
    my $self  = shift;
525
526
    my $param = shift;

527
    $log->syslog('debug', 'Getting the indexes defined on table %s',
528
        $param->{'table'});
529
530
    my %found_indexes;
    my $sth;
531
532
    unless (
        $sth = $self->do_query(
533
534
535
536
537
            q{SELECT c.oid
              FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n
              ON n.oid = c.relnamespace
              WHERE c.relname ~ '^(%s)$' AND
                    pg_catalog.pg_table_is_visible(c.oid)},
538
539
540
            $param->{'table'}
        )
        ) {
541
        $log->syslog('err',
542
543
544
            'Could not get the oid for table %s in database %s',
            $param->{'table'}, $self->{'db_name'});
        return undef;
545
546
    }
    my $ref = $sth->fetchrow_hashref('NAME_lc');
547
548
549
550
551
552
553

    unless (
        $sth = $self->do_query(
            "SELECT c2.relname, pg_catalog.pg_get_indexdef(i.indexrelid, 0, true) AS description FROM pg_catalog.pg_class c, pg_catalog.pg_class c2, pg_catalog.pg_index i WHERE c.oid = \'%s\' AND c.oid = i.indrelid AND i.indexrelid = c2.oid AND NOT i.indisprimary ORDER BY i.indisprimary DESC, i.indisunique DESC, c2.relname",
            $ref->{'oid'}
        )
        ) {
554
        $log->syslog(
555
556
557
558
559
560
            'err',
            'Could not get the list of indexes from table %s in database %s',
            $param->{'table'},
            $self->{'db_name'}
        );
        return undef;
561
562
563
    }

    while (my $ref = $sth->fetchrow_hashref('NAME_lc')) {
564
565
566
567
568
569
570
        $ref->{'description'} =~
            s/CREATE INDEX .* ON .* USING .* \((.*)\)$/$1/i;
        $ref->{'description'} =~ s/\s//i;
        my @index_members = split ',', $ref->{'description'};
        foreach my $member (@index_members) {
            $found_indexes{$ref->{'relname'}}{$member} = 1;
        }
571
    }
572
573
574
    return \%found_indexes;
}

575
sub unset_index {
576
    my $self  = shift;
577
    my $param = shift;
578
    $log->syslog('debug', 'Removing index %s from table %s',
579
        $param->{'index'}, $param->{'table'});
580
581

    my $sth;
582
    unless ($sth = $self->do_query("DROP INDEX %s", $param->{'index'})) {
583
        $log->syslog('err',
584
585
586
            'Could not drop index %s from table %s in database %s',
            $param->{'index'}, $param->{'table'}, $self->{'db_name'});
        return undef;
587
588
    }
    my $report = "Table $param->{'table'}, index $param->{'index'} dropped";
589
    $log->syslog('info', 'Table %s, index %s dropped',
590
        $param->{'table'}, $param->{'index'});
591
592
593
594
595

    return $report;
}

sub set_index {
596
    my $self  = shift;
597
598
599
    my $param = shift;

    my $sth;
600
    my $fields = join ',', @{$param->{'fields'}};
601
    $log->syslog(
602
603
604
605
606
607
608
609
610
611
612
        'debug',
        'Setting index %s for table %s using fields %s',
        $param->{'index_name'},
        $param->{'table'}, $fields
    );
    unless (
        $sth = $self->do_query(
            "CREATE INDEX %s ON %s (%s)", $param->{'index_name'},
            $param->{'table'},            $fields
        )
        ) {
613
        $log->syslog(
614
615
616
617
618
619
620
            'err',
            'Could not add index %s using field %s for table %s in database %s',
            $fields,
            $param->{'table'},
            $self->{'db_name'}
        );
        return undef;
621
622
    }
    my $report = "Table $param->{'table'}, index %s set using $fields";
623
    $log->syslog('info', 'Table %s, index %s set using fields %s',
624
        $param->{'table'}, $param->{'index_name'}, $fields);
625
626
627
    return $report;
}

628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
sub translate_type {
    my $self = shift;
    my $type = shift;

    return undef unless $type;

    # PostgreSQL
    $type =~ s/^int(1)/smallint/g;
    $type =~ s/^int\(?.*\)?/int4/g;
    $type =~ s/^smallint.*/int4/g;
    $type =~ s/^tinyint\(.*\)/int2/g;
    $type =~ s/^bigint.*/int8/g;
    $type =~ s/^double/float8/g;
    $type =~ s/^text.*/varchar(500)/g;
    $type =~ s/^longtext.*/text/g;
    $type =~ s/^datetime.*/timestamptz/g;
    $type =~ s/^enum.*/varchar(15)/g;
    $type =~ s/^mediumblob/bytea/g;
    return $type;
}

649
sub AS_DOUBLE {
650
651
    return ({'pg_type' => DBD::Pg::PG_FLOAT8()} => $_[1])
        if scalar @_ > 1;
652
653
654
    return ();
}

655
sub AS_BLOB {
656
657
    return ({'pg_type' => DBD::Pg::PG_BYTEA()} => $_[1])
        if scalar @_ > 1;
658
659
660
661
    return ();
}

1;
662
663
664
665
666
667
__END__

=encoding utf-8

=head1 NAME

668
Sympa::DatabaseDriver::PostgreSQL - Database driver for PostgreSQL
669
670
671

=head1 SEE ALSO

sikeda's avatar
sikeda committed
672
L<Sympa::DatabaseDriver>.
673
674

=cut