My favourite feature of BDR by 2ndQuadrant is “multi master table level selective replication”. Currently, I am preparing a plan to switch from trigger based to low level BDR replication for better performance.

But, how can I detect which replicated tables got changes (needed for memcached invalidation, for example). I can not trap it inside always-enabled-trigger any longer. An alternative is to modify BDR plugin c source code. Thanks to 2ndQuadrant team for clear source code. Following code is my modification for bdr_apply.c:

  1. Module level variable and function declaration:
  2. #define DTABLECHUNK 5
    static char** dtables;
    static int dtablessize;
    static int dtablescount;
    static void append_dtables(char* t);
  3. Variable initialization at the end of process_remote_begin:
  4. static void
    process_remote_begin(StringInfo s) {
    ...
    dtables=NULL;
    dtablessize=dtablescount=0;

    }

  5. Modify a bit declaration and definition of read_rel function to catch tablename:
  6. static BDRRelation *
    read_rel(StringInfo s, LOCKMODE mode, char** dtablename){
    int relnamelen;
    int nspnamelen;
    RangeVar* rv;
    Oid relid;
    MemoryContext oldcontext;

    rv = makeNode(RangeVar);

    nspnamelen = pq_getmsgint(s, 2);
    rv->schemaname = (char *) pq_getmsgbytes(s, nspnamelen);

    relnamelen = pq_getmsgint(s, 2);
    rv->relname = (char *) pq_getmsgbytes(s, relnamelen);

    oldcontext=MemoryContextSwitchTo(MessageContext);
    //nspnamelen and relnamelen is the length of the name including terminating zero
    *dtablename=(char*)palloc( (nspnamelen+relnamelen) * sizeof(char) );
    MemoryContextSwitchTo(oldcontext);
    sprintf(*dtablename, "%s.%s", rv->schemaname, rv->relname);

    relid = RangeVarGetRelidExtended(rv, mode, false, false, NULL, NULL);

    return bdr_heap_open(relid, NoLock);

    }

  7. Declare variable char *tablename inside process_remote_insert, process_remote_update and process_remote_delete. Add its address (&tablename) as third argument for read_real function invocation. Then, register the tablename by calling my append_dtables just after simple_heap_insert, simple_heap_update and simple_heap_delete.
  8. Call outside function to notify which tables got changes inside process_remote_commit function, somewhere after a call to CommitTransactionCommand.
  9. for(i=0;dtables!=NULL && i<dtablescount;i++) {
    //any function call to notify dtables[i] got changes
    }
  10. And finally, this is my append_dtables definition. Note that memory is allocated in MessageContext so it will be freed automatically in bdr_apply_work main loop.
  11. static void
    append_dtables(char* t) {
    int i, allocsize;
    MemoryContext oldcontext;

    for(i=0;i<dtablescount;i++) {

    if(!strcmp(dtables[i], t)) {//ensure unique table registration

    oldcontext=MemoryContextSwitchTo(MessageContext);
    pfree(t);
    MemoryContextSwitchTo(oldcontext);
    return;

    }

    }
    if(dtablessize<++dtablescount) {

    dtablessize+=DTABLECHUNK;
    allocsize=dtablessize * sizeof(char*);
    oldcontext=MemoryContextSwitchTo(MessageContext);
    if(dtables==NULL)

    dtables=(char*)palloc(allocsize);

    else

    dtables=(char**)repalloc(dtables, allocsize);

    MemoryContextSwitchTo(oldcontext);

    }
    dtables[dtablescount-1]=t;

    }

    What I love from open source project is that there is always a way to tweak to suit my development framework.

Cache Invalidation
My previous post with the same title leaves a question: how to invalidate cache and deal with race condition. Colin ‘t Hart suggested to implement “on commit trigger” which is not available in PostgreSQL currently.

In a good book “PostgreSQL Server Programming” by Jim Mlodgenski, Hannu Krosing and Kirk Roybal, there is a statement that we can register C-language function to be called on COMMIT or ROLLBACK events.

I have an idea to have such a C function by which I can store keys for invalidation and later call pgmemcache’s memcache_delete in respond to COMMIT event:

CREATE OR REPLACE FUNCTION mcache_delete(IN t_keys text[])
RETURNS void
AS '$libdir/mcache', 'mcache_delete' LANGUAGE C IMMUTABLE;
psql=# begin;
psql=# ... some work ...
psql=# select mcache_delete(ARRAY['dcb50cad', '1f19bee7']); --store keys for invalidation
psql=# ... other work ...
psql=# select mcache_delete(ARRAY['91f29028']); --store other keys for invalidation
psql=# commit; --delete memcache keys

My code:

#include <postgres.h>
#include <fmgr.h>
#include <utils/array.h>
#include <utils/builtins.h>
#include <catalog/pg_type.h>
#include <access/xact.h>
#include <utils/memutils.h>

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

//declare pgmemcahe functionality
extern Datum memcache_delete(PG_FUNCTION_ARGS);

void _PG_init(void);
void _PG_fini(void);
static void mcache_xact_callback(XactEvent event, void *arg);
static void mcache_reset(void);

static int mcache_count;
static text** mcache_keys;

PG_FUNCTION_INFO_V1(mcache_delete);
Datum mcache_delete(PG_FUNCTION_ARGS);

Datum mcache_delete(PG_FUNCTION_ARGS) {
  ArrayType* keys;
  Datum* elems;
  bool* nulls;
  int n, i;
  MemoryContext oldCtx;

  if(PG_NARGS() != 1)
    elog(ERROR, "1 argument expected");

  if( !PG_ARGISNULL(0) ){
    keys = PG_GETARG_ARRAYTYPE_P(0);
    deconstruct_array(
    keys, TEXTOID, -1, false, 'i',
    &elems, &nulls, &n);
    if(n>0){
      oldCtx = MemoryContextSwitchTo(TopTransactionContext);
      if(mcache_count == 0)
        mcache_keys = (text**)palloc(n * sizeof(text*));
      else
        mcache_keys = (text**)repalloc( (void*)mcache_keys, (mcache_count+n) * sizeof(text*));
      for(i=0; i<n ; i++)
        mcache_keys[mcache_count+i] = nulls[i] ? NULL : DatumGetTextPCopy(elems[i]);
      mcache_count += n;
      MemoryContextSwitchTo(oldCtx);
    }

  }

  PG_RETURN_VOID();
}

void _PG_init(void) {
  mcache_reset();
  RegisterXactCallback(mcache_xact_callback, NULL);
}

void _PG_fini(void) {
  mcache_reset();
}

static void mcache_xact_callback(XactEvent event, void *arg) {
  int i;
  if( event == XACT_EVENT_COMMIT ) {//commit
    for(i=0; i<mcache_count; i++)
      if(mcache_keys[i])
        DirectFunctionCall1(//call pgmemcache function
          memcache_delete,
          PointerGetDatum(mcache_keys[i]) );
    //note: mcache_keys are allocated in TopTransactionContext
    //and implicitly released by internal Postgresql framework
    mcache_reset();
  } else if ( event == XACT_EVENT_ABORT ) //rollback
    mcache_reset();
}

static void mcache_reset(void) {
  mcache_count=0;
  mcache_keys=NULL;
}

I apologize for my cancelled post titled “Linux System Programming in C”. It is supposed to be posted under other category.

To alleviate database load, my first pick is pgpool-II with memory_cache_enabled turned on. Unfortunately, it does not cache query result from stored procedure.

The alternative is pgmemcache. It provides interface functions to memcached and can be called from within stored procedure.

Thanks to pgpool-II idea, SQL command and result is cached into memcached hash:

  • Taking md5 of SQL as key.
  • Forming set of records to string as value.

Assumption:

  • memcahed server is up and running.
  • pgmemcahe extension is installed in postgersql server.
  • postgresql.conf has been set accordingly.

My case is query result for chart of account (coa) table joining 3 tables:

SELECT
  coa.id,
  tipe.label AS type_label,
  coa.kode,
  coa.label AS coa_label,
  dbcr.label AS dbcr_label
FROM acct.coa coa
INNER JOIN acct.tipe tipe ON coa.tipe=tipe.id
INNER JOIN acct.dbcr dbcr ON coa.dbcr=dbcr.id
OFFSET 0 LIMIT 25;

I wrap it insided plpgsql stored procedure. Here is my custom data type:

CREATE TYPE acct.coa_cache AS
(i_id integer,
t_tipe text,
t_kode text,
t_label text,
t_dbcr text);

Stored Procedure:

CREATE OR REPLACE FUNCTION acct.coa_query_page_cache(OUT d acct.coa_cache)
RETURNS SETOF acct.coa_cache AS
$BODY$
DECLARE
  _sql text;
  _key text;
  _v text;
  _ds acct.coa_cache[];
BEGIN
  _sql:= 'SELECT coa.id'
    || ',tipe.label'
    || ',coa.kode'
    || ',coa.label'
    || ',dbcr.label'
    || ' FROM acct.coa coa'
    || ' INNER JOIN acct.tipe tipe ON coa.tipe=tipe.id'
    || ' INNER JOIN acct.dbcr dbcr ON coa.dbcr=dbcr.id'
    || ' OFFSET 0 LIMIT 10';
  _key:=md5(_sql); --taking md5 as key
  _v:=memcache_get(_key); --get hash value by key
  IF _v IS NOT NULL THEN --hit
    raise notice 'hit';
    _ds:=NULLIF(_v,'')::acct.coa_cache[];--convert string to acct.coa_cache array
    RETURN QUERY SELECT * FROM UNNEST(_ds);
  ELSE --miss
    raise notice 'miss';
    FOR d IN EXECUTE _sql
    LOOP
      _ds:= _ds || d;
      RETURN NEXT;
    END LOOP;
    PERFORM memcache_set(_key, _ds::text);--register key/value to memcached
  END IF;
  RETURN;
END;$BODY$
  LANGUAGE plpgsql STABLE SECURITY DEFINER;

Let us check how it works with psql.

psql# \timing on
Timing is on.

First, check the raw SQL performance:

psql# SELECT coa.id,
tipe.label AS tipe_label,
coa.kode,
coa.label,
dbcr.label AS dbcr_label
FROM acct.coa coa
INNER JOIN acct.tipe tipe ON coa.tipe=tipe.id
INNER JOIN acct.dbcr dbcr ON coa.dbcr=dbcr.id
OFFSET 0 LIMIT 10;
id | tipe_label | kode | label | dbcr_label
----+------------+-----------+-----------------------------+------------
1 | AKTIVA | 1 | AKTIVA | DEBET
2 | AKTIVA | 1.1 | AKTIVA LANCAR | DEBET
3 | AKTIVA | 1.1.1 | KAS | DEBET
4 | AKTIVA | 1.1.1.1 | KAS DANA ZAKAT | DEBET
5 | AKTIVA | 1.1.1.2 | Kas Dana Infaq | DEBET
6 | AKTIVA | 1.1.1.2.1 | Kas Dana Infaq Umum | DEBET
7 | AKTIVA | 1.1.1.2.2 | Kas Dana Infaq Yatim Dhuafa | DEBET
8 | AKTIVA | 1.1.1.2.3 | KAS DANA INFAQ QURBAN | DEBET
9 | AKTIVA | 1.1.1.3 | Kas Dana Kemanusiaan | DEBET
10 | AKTIVA | 1.1.1.3.1 | Kas Dana Kemanusiaan Umum | DEBET
(10 rows)

Time: 6.652 ms

Second run: 1.290 ms
Third run: 1.248 ms
Fourth run: 1.238 ms
Fifth run: 1.263 ms
Looks it get saturated at 1.2 ms for long run.

How it looks like for memcached version:

psql# select * from acct.coa_query_page_cache();
NOTICE: miss
i_id | t_tipe | t_kode | t_label | t_dbcr
------+--------+-----------+-----------------------------+--------
1 | AKTIVA | 1 | AKTIVA | DEBET
2 | AKTIVA | 1.1 | AKTIVA LANCAR | DEBET
3 | AKTIVA | 1.1.1 | KAS | DEBET
4 | AKTIVA | 1.1.1.1 | KAS DANA ZAKAT | DEBET
5 | AKTIVA | 1.1.1.2 | Kas Dana Infaq | DEBET
6 | AKTIVA | 1.1.1.2.1 | Kas Dana Infaq Umum | DEBET
7 | AKTIVA | 1.1.1.2.2 | Kas Dana Infaq Yatim Dhuafa | DEBET
8 | AKTIVA | 1.1.1.2.3 | KAS DANA INFAQ QURBAN | DEBET
9 | AKTIVA | 1.1.1.3 | Kas Dana Kemanusiaan | DEBET
10 | AKTIVA | 1.1.1.3.1 | Kas Dana Kemanusiaan Umum | DEBET
(10 rows)

Time: 3.488 ms
siazcenter=# select * from acct.coa_query_page_cache();
NOTICE: hit
i_id | t_tipe | t_kode | t_label | t_dbcr
------+--------+-----------+-----------------------------+--------
1 | AKTIVA | 1 | AKTIVA | DEBET
2 | AKTIVA | 1.1 | AKTIVA LANCAR | DEBET
3 | AKTIVA | 1.1.1 | KAS | DEBET
4 | AKTIVA | 1.1.1.1 | KAS DANA ZAKAT | DEBET
5 | AKTIVA | 1.1.1.2 | Kas Dana Infaq | DEBET
6 | AKTIVA | 1.1.1.2.1 | Kas Dana Infaq Umum | DEBET
7 | AKTIVA | 1.1.1.2.2 | Kas Dana Infaq Yatim Dhuafa | DEBET
8 | AKTIVA | 1.1.1.2.3 | KAS DANA INFAQ QURBAN | DEBET
9 | AKTIVA | 1.1.1.3 | Kas Dana Kemanusiaan | DEBET
10 | AKTIVA | 1.1.1.3.1 | Kas Dana Kemanusiaan Umum | DEBET
(10 rows)

Time: 0.846 ms

Third run: 0.826 ms (hit)
Fourth run: 0.838 ms (hit)
Fifth run: 0.815 ms (hit)

The longest 3.488 ms is for the first run when I miss memcached key and start register query key/value. Next run is faster, saturated at 0.8 ms for long run. The more important thing is that my database load is reduced in write once, read many times scenario.

Invalidation
How to invalidate memcached hash when underlying tables are updated. In my case, I write statement trigger and assign it to underlying tables. It will delete key/value from memcached whenever any underlying tables are modified.

Thanks to Bucardo team for responding my previous post. My cascaded slave replication works as expected.

Today I notice there is still something to do related with delta and track tables.
Single table replication scenario:
Db-A/Tbl-T1 (master) => Db-B/Tbl-T2 (slave) => Db-C/Tbl-T3 (cascaded slave)

Every change on Table T1 replicated to T2, then T2 to T3. After a while, VAC successfully cleans delta and track tables on Db-A. But not on Db-B.

I detect 2 issues:
1. If cascaded replication T2 to T3 successful, the delta table on Db-B is not be cleaned up by VAC.
2. If cascaded replication T2 to T3 failed before VAC schedule, the delta table on Db-B will be cleaned up by VAC. Then, cascaded replication from T2 to T3 losts.

I fix it by modifying SQL inside bucardo.bucardo_purge_delta(text, text) in Db-A and Db-B:

— Delete all txntimes from the delta table that:
— 1) Have been used by all dbgroups listed in bucardo_delta_targets
— 2) Have a matching txntime from the track table
— 3) Are older than the first argument interval
myst = 'DELETE FROM bucardo.'
|| deltatable
|| ' USING (SELECT track.txntime AS tt FROM bucardo.'
|| tracktable
|| ' track INNER JOIN bucardo.bucardo_delta_targets bdt ON track.target=bdt.target'
|| ' WHERE bdt.tablename::regclass::text='
|| quote_literal($2)

|| ' GROUP BY 1 HAVING COUNT(*) = '
|| drows
|| ') AS foo'
|| ' WHERE txntime = tt'
|| ' AND txntime < now() – interval '
|| quote_literal($1);

Need advice from Bucardo team.

When I read new release of Bucardo 5 with capability of asynchronous multi-master replication, I was eagered to wet my foot before swimming. Not really about multi-master features, just to implement simple master-slave-cascaded_slave replication, as I have done with Slony.

Scenario:
master table A => slave table B => cascaded slave table C.

So, I built Bucardo from source version 5.1.1 then configured databases, tables, syncs. I have turned on “makedelta” for slave table which is source for cascaded replication to another slave table. Master to slave (A to B) replication run well. But, unfortunately, the cascaded replication (B to C) does not work.

Then I dig into Bucardo.pm source code. In sub start_kid, the “does_makedelta hash” populated as follows:

sub start_kid {


for my $dbname (@dbs_source) {

$x = $sync->{db}{$dbname};

for my $g (@$goatlist) {


## Set the per database/per table makedelta setting now
if (defined $g->{makedelta}) {

if ($g->{makedelta} eq ‘on’ or $g->{makedelta} =~ /\b$dbname\b/) {

$x->{does_makedelta}{$S}{$T} = 1;

}

}

}

}

}

I suspect that “does_makedelta hash” reference was incorrectly taken from dbs_source. To my opinion, it should be taken from dbs_target. So, I move “does_makedelta hash” to another new dbs_target loop:

sub start_kid {


for my $dbname (@dbs_source) {

$x = $sync->{db}{$dbname};

for my $g (@$goatlist) {


## Set the per database/per table makedelta setting now
## if (defined $g->{makedelta}) {

## if ($g->{makedelta} eq ‘on’ or $g->{makedelta} =~ /\b$dbname\b/) {

## $x->{does_makedelta}{$S}{$T} = 1;
## …

## }

## }

}

}

for my $dbname (@dbs_target) {

$x = $sync->{db}{$dbname};
for my $g (@$goatlist) {

next if $g->{reltype} ne ‘table';
($S,$T) = ($g->{safeschema},$g->{safetable});
## Set the per database/per table makedelta setting now
if (defined $g->{makedelta}) {

if ($g->{makedelta} eq ‘on’ or $g->{makedelta} =~ /\b$dbname\b/) {

$x->{does_makedelta}{$S}{$T} = 1;
$self->glog(“Set table $dbname.$S.$T to makedelta”, LOG_NORMAL);

}

}

}

}

}

I have not verified whether it was valid. But, at least, my cascaded replication works.

To speedup my web application backed by PostgreSQL, query results is cached in Memcached. I have been using pgmemcache extension, so I can communicate with the distributed memory object caching system from within my stored procedure.

The postgresql extension code is based on libmemcached. And I have noticed that start from version 1.0.16, libmemcached has modified codes within memcached_stat_get_value function (defined in stats.cc). The resulted string is no longer allocated by customized allocator, but using malloc instead. If pgmemcache version>= 2.0.1 (2.1.2 in my case) is built against the libmemcached version, postgresql will crash once you invoke:
abduldb=# select memcache_stats();

The fix is simply modify memory deallocation from pfree() to free() for string after memcached_stat_get_value in Datum memcache_stats(PG_FUNCTION_ARGS) or server_stat_function(…) (defined in pgmemcache.c):

char *value = memcached_stat_get_value(ptr, &stat, *stat_ptr, &rc);
appendStringInfo(context, "%s: %s\n", *stat_ptr, value);
//pfree(value);
free(value);

For me, pltoolbox is a great extension by Pavel Stehule. I have downloaded and built it on PostgreSQL 9.3.0 and come out with warnings:

warning: implicit declaration of function ‘HeapTupleHeaderGetTypeId’
warning: implicit declaration of function ‘HeapTupleHeaderGetTypMod’
warning: implicit declaration of function ‘HeapTupleHeaderGetDatumLength’

If I just simply ignore those warnings then SQL installation failed:

psql -f pltoolbox.sql -U postgres -h localhost -p 5432 -d targetdb
ERROR: could not load library “/opt/pgsql/9.3.0/lib/pltoolbox.so”: /opt/pgsql/9.3.0/lib/pltoolbox.so: undefined symbol: HeapTupleHeaderGetTypeId

The fix is just simply put include statement in “record.c”:

#include “access/htup_details.h”

After then, the package building and installation went smoothly.

Follow

Get every new post delivered to your Inbox.