Detect Inactive Slony Node

July 18, 2011 2 comments

One concern in implementing Slony as PostgreSQL Replication System is that table sl_event easily growing to a huge number of record. So, I keep monitor its size on weekly basis. Its size ever reaches more than a million record.

After doing some troubleshoot, I have found that it is caused by some inactive replication node which does not confirm the changes message conveyed by the event. By re-activating slony daemon on the nodes, the problem goes away.

How to find out which nodes inactive? This is my query (assuimng the slony cluster name is clstr):

SELECT t.con_received, t.ts FROM (
SELECT con_received, MAX(con_timestamp) ts
FROM    _clstr.sl_confirm GROUP BY con_received
) t ORDER BY t.ts;

Following is example of the query outcome:
con_received | ts
————–+—————————-
24 | 2011-07-11 08:49:18.004292
26 | 2011-07-18 05:59:33.483955
30 | 2011-07-18 10:31:55.235895
31 | 2011-07-18 12:26:35.889352
22 | 2011-07-18 12:27:02.932156
21 | 2011-07-18 12:27:21.21836
15 | 2011-07-18 12:27:23.298656
18 | 2011-07-18 12:27:23.701799
17 | 2011-07-18 12:27:27.052192
28 | 2011-07-18 13:43:19.826342
11 | 2011-07-18 13:58:25.891667
1 | 2011-07-18 13:58:37.566528
29 | 2011-07-18 13:58:47.321221
16 | 2011-07-18 14:01:08.206147
13 | 2011-07-18 14:01:10.92797
19 | 2011-07-18 14:02:02.476146
14 | 2011-07-18 14:02:02.51104
20 | 2011-07-18 14:02:05.63391
27 | 2011-07-18 14:14:55.750242

It tells us that node 24 has been inactive for 7 days relative to other nodes.

Categories: PostgreSQL

Transfer Binary from Photo File to Database Table

December 25, 2010 2 comments
In many web application, personal photo is saved as [*.jpg] file in a folder in server’s disk. The relation with personal ID in database is established by set the filename accordingly in format: nnnnn.jpg, e.g: 00001.jpg, 00002.jpg and so on.
Once upon a time, I was in charge to move content of photo file into database’s field, integrated with personal identity table. All works in Linux environment and PostgreSQL 9.0.
I want to share the story….
Original setting: photo file is saved in folder: /home/webuser/photo/ and person table resides in persondb database with DDL as follows:

CREATE TABLE person
(
id serial NOT NULL,
fullname text NOT NULL,
addres text NOT NULL,
CONSTRAINT “person: id must be unique” PRIMARY KEY (id)
) WITH (OIDS=FALSE);

The table has been populated with hundreds of person records.

Part 1: Alter Table
Add a field to store photo bytearray:

ALTER TABLE person ADD COLUMN photo bytea;
Part 2: Create C Source Code
Create person.c:

#include <postgres.h>
#include <fmgr.h>
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
#include <lib/stringinfo.h>
#include <executor/spi.h>
#include <catalog/pg_type.h>
#include <utils/builtins.h>
#include <utils/bytea.h>
#include <catalog/pg_type.h>

PG_FUNCTION_INFO_V1(person_photo);
Datum person_photo(PG_FUNCTION_ARGS);

Datum
person_sync_photo(PG_FUNCTION_ARGS)
{
int32 personId;
StringInfoData photoFile;
StringInfoData photoBytea;
const char* PHOTO_PATH=”/home/webuser/photo/”;
const char* PHOTO_EXT=”jpg”;
FILE* f;
const size_t BUFFER_SIZE=5*1024;
char buf[BUFFER_SIZE];
size_t byteReadCount;
const int32 SQL_PARAM_COUNT=2;
Oid argTypes[SQL_PARAM_COUNT];
Datum vals[SQL_PARAM_COUNT];

////get function argument:BEGIN

if(PG_ARGISNULL(0)) {
elog(ERROR,”person id can not be NULL”);
i}
personId=PG_GETARG_INT32(0);

////get function argument:END

////read photo file:BEGIN

photoBytea.data=0;
photoBytea.len=0;
initStringInfo(&photoFile);
appendStringInfo(&photoFile, “%s%05d.%s”, PHOTO_PATH, personId, PHOTO_EXT);
f=fopen(photoFile.data, “rb”);//open photo file
pfree(photoFile.data);
if(f){
initStringInfo(&photoBytea);
while(!feof(f)){//keep reading until end of file
byteReadCount=fread(buf, 1, BUFFER_SIZE, f);
appendBinaryStringInfo(&photoBytea, buf, byteReadCount);
}
fclose(f);
} else {
elog(NOTICE, “person with id %d does not have photo”, personId);
}

////read photo file:END

////save to database:BEGIN

if(photoBytea.len){
SPI_connect();

argTypes[0]=INT4OID;
argTypes[1]=BYTEAOID;
vals[0]=Int32GetDatum(personId);
vals[1]=DirectFunctionCall1(bytearecv, PointerGetDatum(&photoBytea));

SPI_execute_with_args(“UPDATE person SET photo=$2 WHERE id=$1″,
SQL_PARAM_COUNT, argTypes, vals, NULL, FALSE, 1);

SPI_finish();
}
if(photoBytea.data){
pfree(photoBytea.data);
}

////save to database:END

PG_RETURN_VOID();
}

Part 3: Create SQL to Install Our Custom Library
Create person.sql.in in the same folder as person.c:

CREATE OR REPLACE FUNCTION person_photo(person_id integer)
RETURNS VOID
AS ‘MODULE_PATHNAME’ LANGUAGE C VOLATILE;

Part 4: Create Makefile
Create Makefile in the same folder as person.c:

MODULE_big = person
OBJS = person.o
DATA_built = person.sql

PG_CPPFLAGS := $(shell pg_config –includedir-server)

PGXS := $(shell pg_config –pgxs)
include $(PGXS)

Part 5: Compile and Install
In the same folder as Makefile, run command make then make install. Make sure we have sufficient privilege to create file in PostgreSQL contrib and library folder. If everything goes fine then we will have two files:

  1. SQL file person.sql in PostgreSQL contrib directory.
  2. Library file person.so in PostgreSQL library directory.
Part 6: Install Stored Procedure
Assuming we are in the same folder as person.sql and want to install stored procedure in persondb database (where table person exists) at host server1:

psql -U postgres -h server1 -f person.sql -d persondb

Part 7: Move Photo From File to Table
All preparation has been set. It is time to login to PostgreSQL server and run the stored procedure (make sure folder /home/webuser/photo/ is accessible by postgres user):

psql -U postgres -h server1 persondb
Password for user postgres:[your password]
postgres=#SELECT person_photo(id) FROM person;

Wait a moment, let PostgreSQL read photo files and store the binaries into field photo.

My story ends….
Categories: PostgreSQL

PgPool II 3.0: bug fixes for md5 authentication

September 25, 2010 2 comments
I have problem with PgPool II 3.0. I have downloaded the source code from http://pgfoundry.org/projects/pgpool. Extract and compile it successfully. Configure it just to have connection pooling enabled (other features: Replication, LoadBalance, Parallel Query are disabled). But, I can not connect to PostgreSQL 9.0 via PgPool port at 9999 with authentication mode set to md5 in pg_hba.conf (pool_hba.conf is disabled).

After take some time to trace the source code, I have found bug in file pool_auth.c. Inside function:

static int do_md5(POOL_CONNECTION *backend, POOL_CONNECTION *frontend, int reauth, int protoMajor)
variable int size is declared but it never have the length of password.

4 steps to fix the problem:
  1. Modify forward declaration of function int read_password_packet(…), add int *pwdSize as the fourth argument:
    //static int read_password_packet(POOL_CONNECTION *frontend, int protoMajor, char *password);
    static int read_password_packet(POOL_CONNECTION *frontend, int protoMajor, char *password, int *pwdSize);

  2. Modify function body of function int read_password_packet(…). Watch that *pwdSize has the password length at the end of the function body:
    /*
    * Read password packet from frontend
    */
    //static int read_password_packet(POOL_CONNECTION *frontend, int protoMajor, char *password)
    static int read_password_packet(POOL_CONNECTION *frontend, int protoMajor, char *password, int *pwdSize)
    {
      int size;
      /* Read password packet */
      if (protoMajor == PROTO_MAJOR_V2)
      {
        if (pool_read(frontend, &size, sizeof(size)))
        {
          pool_error(“read_password_packet: failed to read password packet size”);
          return -1;
        }
      }
      else
      {
        char k;
        if (pool_read(frontend, &k, sizeof(k)))
        {
          pool_debug(“read_password_packet_password: failed to read password packet \”p\”");
          return -1;
        }
        if (k != ‘p’)
        {
          pool_error(“read_password_packet_password: password packet does not start with \”p\”");
          return -1;
        }
        if (pool_read(frontend, &size, sizeof(size)))
        {
          pool_error(“read_password_packet_password: failed to read password packet size”);
          return -1;
        }
      }
      if (pool_read(frontend, password, ntohl(size) – 4))
      {
          pool_error(“read_password_packet: failed to read password (size: %d)”, ntohl(size) – 4);
          return -1;
      }
      *pwdSize=size; //now *pwdSize has password length
      return 0;
    }

  3. Inside function
    static int do_md5(POOL_CONNECTION *backend, POOL_CONNECTION *frontend, int reauth, int protoMajor)
    add &size as fourth argument to every read_password_packet function call, for example:

    read_password_packet(frontend, protoMajor, password, &size)
  4. Recompile and reinstall pgpool project.

Done. Now. I can login to PostgreSQL 9.0 smoothly through PgPool port: 9999.

Categories: PostgreSQL

dbi-link: fix error on insert/update/delete into/from remote mysql table

July 28, 2010 6 comments
I have installed dbi-link version 2.0.0 into my PostgreSQL 8.4 environment. The purpose is to query or modify remote mysql table from within postgresql user-defined-function. And whenever I try to insert/update/delete record into/from remote mysql table, I consistently get error message:
ERROR: error from Perl function “shadow_trigger_func”: Can’t call method “quote” on an undefined value at line 61
So i go to the mentioned function and found out sub functions: make_pairs, do_insert, do_update and do_delete try to access variables: $data_source_id and $table which are declared locally in main function.
Knowing that problem, I have modified parameter passing as follows:

Within main function:

if ($iud->{ $_TD->{new}{iud_action} }) {
   $iud->{ $_TD->{new}{iud_action} }->({
      payload => $_TD->{new},
      tbl => $table,
      source_id => $data_source_id
   });
}
else {
   die “Trigger event was $_TD->{new}{iud_action}<, but should have been one of I, U or D!"
}
And inside sub functions: modify $table and $data_source_id into $params->{tbl} and $params->{source_id}, respectively.
Categories: PostgreSQL

Integrated Networks for Educational Research

May 24, 2010 2 comments

Linux Community at Batam, Indonesia has launched a web application dedicated for educational research by Indonesian teachers. We call it INFER: Integrated Networks For Educational Research (http://infer.gelora-batam.org). It is a Rich Internet Application developed using Ext-GWT, deployed on Tomcat5 and backed by PostgreSQL 8.4 (for its CTE feature). Statistic Probability Density function is calculated within stored procedure written in C, to wrap library provided by GSL (GNU Scientific Library).

At this moment, the service provides analytical tools for: Correlation and One way Analyses Of Variance (ANOVA) study, as well as Exams Item Validation tool (as online alternative for ITEMAN.EXE by Assessments Systems Corporation).

The project will be continually enhanced by Batam Linux Community for more analytical tools.

Categories: PostgreSQL

PLSQL for Curve Smoothing  

May 16, 2009 2 comments

In an engineering project, I have been assigned to a task constructing smooth curve for given sets of sample points. This is accomplished by implementing Cubic Splines and Thomas algorithm to solve linear algebraic equation. So, I write PLSQL stored procedure as follows:

CREATE OR REPLACE FUNCTION spline(
IN x_arr numeric[], IN y_arr numeric[], IN resolution numeric,
OUT xs numeric, OUT ys numeric)
RETURNS SETOF record AS
$BODY$
DECLARE
  x numeric[];
  y numeric[];
  e numeric[] DEFAULT ‘{}’::numeric[];
  f numeric[] DEFAULT ‘{}’::numeric[];
  g numeric[] DEFAULT ‘{}’::numeric[];
  r numeric[] DEFAULT ‘{}’::numeric[];
  f2 numeric[] DEFAULT ‘{}’::numeric[];
  count integer;
  i integer;
  j integer;
  idx integer;
  tmp numeric;
BEGIN
  x:=x_arr;
  y:=y_arr;
  count:=ARRAY_UPPER(x,1);
  IF count<3 THEN
    RAISE EXCEPTION ‘sample size must be >=3′;
  END IF;
  IF ARRAY_UPPER(y,1)<>count THEN
    RAISE EXCEPTION ‘x and y array size must be identical’;
  END IF;

——–sort x, ascendingly:
  FOR i IN 1..count-1 LOOP
    idx:=i+1;
    FOR j IN i+2..count LOOP
      IF x[j]<x[idx] THEN
        idx:=j;
      END IF;
    END LOOP;
    IF x[idx]=x[i] THEN
      RAISE EXCEPTION ‘x values must be unique’;
    ELSIF x[idx]<x[i] THEN –swap
      tmp=x[i];
      x[i]=x[idx];
      x[idx]=tmp;
      tmp=y[i];
      y[i]=y[idx];
      y[idx]=tmp;
    END IF;
  END LOOP;

——–prepare variables for Thomas algorithm
  FOR i IN 1..count LOOP
    IF i=1 OR i=count THEN
      e:= e || 0.0;
      f:= f || 0.0;
      g:= g || 0.0;
      r:= r || 0.0;
      f2:=f2 || 0.0;
      CONTINUE;
    END IF;
    e:= e || x[i] – x[i-1];
    f:= f || 2*(x[i+1]-x[i-1]);
    g:= g || x[i+1] – x[i];
    r:= r || ( (6/(x[i+1]-x[i])) * (y[i+1]-y[i]) )
      + ( (6/(x[i]-x[i-1])) * (y[i-1]-y[i]) );
    f2:=f2 || 0.0;
  END LOOP;

——–decomposition
  FOR i IN 2..count LOOP
    IF f[i-1]<>0 THEN
      e[i]:=e[i]/f[i-1];
    END IF;
    f[i]:=f[i]-(e[i]*g[i-1]);
  END LOOP;

——–forward substitution
  FOR i IN 2..count LOOP
    r[i]:=r[i]-(e[i]*r[i-1]);
  END LOOP;

——–backward substitution
  FOR i IN REVERSE count-1..1 LOOP
    IF f[i]<>0 THEN
      f2[i]:=(r[i] – (g[i]*f2[i+1])) / f[i];
    END IF;
  END LOOP;

——–do interpolation
  FOR i IN 2..count LOOP
    xs:=x[i-1]-resolution;
    LOOP
      xs:=xs+resolution;
      EXIT WHEN i=count AND xs>x[i];
      EXIT WHEN i=x[i];
      ys:= ( (f2[i-1] * (x[i]-xs)^3)
        + (f2[i] * (xs-x[i-1])^3)
        ) / (6*(x[i]-x[i-1]));
      ys:= ys + (
        (y[i-1]/(x[i]-x[i-1])) -
        (f2[i-1]*(x[i]-x[i-1])/6)
        ) * (x[i]-xs);
      ys:= ys + (
        (y[i]/(x[i]-x[i-1])) -
        (f2[i]*(x[i]-x[i-1])/6)
        ) * (xs-x[i-1]);
      RETURN NEXT;
      EXIT WHEN xs>=x[i];
    END LOOP;
  END LOOP;

  RETURN;
END;
$BODY$
LANGUAGE ‘plpgsql’ IMMUTABLE STRICT;

The stored procedure takes 3 input arguments and 2 outs. x_arr and y_arr are sample points’ x and y. With resolution, you can determine the resolution of smoothed curve’s x.

Output arguments: xs and ys contain the set of x and y of smoothed curve, respectively.

Let us use the function. At first, we have sample points:

# x y
1 3.3 1.2
2 4.2 -1.1
3 6.8 1.5
4 8.2 1.2
5 10.2 2.5

Call the function, supply x and y array as displayed on above table, set resolution to 0.1:

SELECT * FROM spline(
  ’{3.3,4.2,6.8,8.2,10.2}’::numeric[],
   ‘{1.2,-1.1,1.5,1.2,2.5}’::numeric[],
  0.1);

We will have set of records containing xs and ys.:

xs  ys
3.3  1.199999999999999970
3.4  0.8860175045157231811
3.5  0.5764170295261004851
3.6  0.2755805955257859750
3.7  -0.0121097769905662565
3.8  -0.28227206752830211620
… and so on

Let us put them in graph:
curve

The blue-y1 and red-y2 are sample and interpolation points as returned by the spline function, respectively.

Categories: PostgreSQL

Tree-structured data and Nested Hash

February 6, 2009 6 comments

I have just accomplished a task to transfer tree-formed data structure from client application environment into PostgreSQL PLSQL body function. The tree at client side is structured as shown below:

  |–Manager1
  |    |
  |    |–Senior1
  |    |    |–Junior1
  |    |    |–Junior2
  |    |
  |    |–Senior2
  |         |–Junior3
  |         |–Junior4
  |  
  |–Manager2
      |
      |–Senior3
      |    |–Junior5
      |    |–Junior6
      |
      |–Senior4
           |–Junior7
           |–Junior8

Instead of send the item pieces one by one, I want to send all the items at once.

PLSQL Function (convert array of ltree into nested hash)

I decide to package the tree items in the array of LTREE field type provided in PostgreSQL contrib module by Teodor Sigaev and Oleg Bartunov. Later in the function body, I convert the LTREE array into HSTORE (again thanks to Teodor Sigaev and Oleg Bartunov). HSTORE maps key to value. Both are string types. But in my case, the hash contains key which point to another hash, creating nested hash.

Following is the stored procedure in PLSQL created on Postgresql 8.3.1:

CREATE OR REPLACE FUNCTION organization(IN trees ltree[]) RETURNS void AS
$BODY$
DECLARE
——–ltree[] will be converted to nested hash:
————-hash_mgr=>hash_snr=>array(Juniors)
    i integer;
    count integer;
    hash_mgr hstore;
    hash_snr hstore;
    tree ltree;
    arr text[];
    mgr text;
    snr text;
    jnr text;
    k1 text; v1 text;
    k2 text; v2 text;
BEGIN
    count:=ARRAY_UPPER(trees,1);
——–convert array of trees into nested hash:
    FOR i IN 1..count LOOP
        tree:=trees[i];
        mgr:=SUBLTREE(tree,0,1);
        snr:=SUBLTREE(tree,1,2);
        jnr:=SUBLTREE(tree,2,3);
        hash_snr:=(hash_mgr->mgr)::hstore;
        arr:=(hash_snr->snr)::text[];
        arr:=COALESCE(arr,’{}’::text[]) || jnr;
        hash_snr:=COALESCE( hash_snr||(snr=>arr::text),snr=>arr::text);
        hash_mgr:=COALESCE( hash_mgr||(mgr=>hash_snr::text),mgr=>hash_snr::text);
    END LOOP;
——–now we hava nested hash
——–usage 1: get Junior1 and Junior2:
    arr:=((hash_mgr->’Manager1′)::hstore)->’Senior1′;
    RAISE NOTICE ‘% %’,arr[1],arr[2];
——–navigate all items
    FOR k1, v1 IN SELECT key, value FROM EACH(hash_mgr) LOOP
        RAISE NOTICE ‘manager: %’,k1;
        FOR k2, v2 IN SELECT key, value FROM EACH(v1::hstore) LOOP
            RAISE NOTICE ‘ senior: %’,k2;
            RAISE NOTICE ‘ junior: %’,(v2::text[])[1];
            RAISE NOTICE ‘ junior: %’,(v2::text[])[2];
        END LOOP;
    END LOOP;
    RETURN;
END;$BODY$
LANGUAGE ‘plpgsql’ STABLE;

Let’s start from lowest level. Hash hash_snr maps “senior” to array of juniors.
Go up one level, hash hash_mgr maps “manager” to hash hash_snr.

To use the function, client application should invoke it as follows:

SELECT organization(‘{
    Manager1.Senior1.Junior1,
    Manager1.Senior1.Junior2,
    Manager1.Senior2.Junior3,
    Manager1.Senior2.Junior4,
        Manager2.Senior3.Junior5,
        Manager2.Senior3.Junior6,
        Manager2.Senior4.Junior7,
        Manager2.Senior4.Junior8
    }’);

Take note RAISE NOTICE at the lower portion of function body. It will give us messages as follows:

NOTICE:    Junior1 Junior2
NOTICE:    manager: Manager1
NOTICE:        senior: Senior1
NOTICE:                junior: Junior1
NOTICE:                junior: Junior2
NOTICE:        senior: Senior2
NOTICE:                junior: Junior3
NOTICE:                junior: Junior4
NOTICE:    manager: Manager2
NOTICE:        senior: Senior3
NOTICE:                junior: Junior5
NOTICE:                junior: Junior6
NOTICE:        senior: Senior4
NOTICE:                junior: Junior7
NOTICE:                junior: Junior8
Categories: PostgreSQL

Prevent Concurrent Update in Read-Committed Transaction

December 4, 2008 4 comments

To prevent concurrent update on table records, I simply just encapsulate upates inside Serializable Transaction. But, quoted from Annotated postgresql.conf and Global User Configuration (GUC) Guide (http://www.varlena.com/GeneralBits/Tidbits/annotated_conf_e.html):

Under a heavy multi-user load, setting “serializable” can impose a significant penalty as numerous transactions are forced to wait for the serialized transaction to complete.

I want to share with you my simple way to implement the feature in PostgreSQL default transaction isolation level: READ COMMITTED.

First, define the table:

CREATE TABLE inventory(
  id serial NOT NULL,
  item text NOT NULL,
  qty integer NOT NULL,
  ver integer NOT NULL DEFAULT 1,
  CONSTRAINT inventory_pkey PRIMARY KEY (id)
)
WITH (OIDS=FALSE);

The last field ver is the focal point of the concurrent update protection.

Let’s create trigger function in PLSQL to be activated BEFORE INSERT and BEFORE UPDATE.

CREATE OR REPLACE FUNCTION trig_inventory()
RETURNS trigger AS
$BODY$
BEGIN
  IF (TG_WHEN = ‘BEFORE’ AND TG_OP = ‘INSERT’) THEN
    NEW.ver = 1; –force ver initiated with 1
    RETURN NEW;
  ELSIF (TG_WHEN = ‘BEFORE’ AND TG_OP = ‘UPDATE’) THEN
    IF OLD.ver > NEW.ver THEN –somebody has updated this record
      RAISE EXCEPTION ‘Concurrent Update Protection!’;
    END IF;
    NEW.ver = OLD.ver + 1; –increment UPDATE
    RETURN NEW;
  END IF;
END;
$BODY$
LANGUAGE ‘plpgsql’ VOLATILE;

Assign the trigger function to inventory table:

CREATE TRIGGER inventory_before_insert BEFORE INSERT
  ON inventory FOR EACH ROW EXECUTE PROCEDURE public.trig_inventory();
CREATE TRIGGER inventory_before_update BEFORE UPDATE
  ON inventory FOR EACH ROW EXECUTE PROCEDURE public.trig_inventory();

Now, let us try the feature (use psql):
First, insert record:

citra=# INSERT INTO inventory (item,qty) VALUES (‘ITEM_1′,50) RETURNING *;
id | item | qty | ver
—-+——–+—–+—–
2 | ITEM_1 | 50 | 1
(1 row)INSERT 0 1

Please note that field ver has been initialized to 1;
Now, update the record to check ver incremented:

citra=# BEGIN;
BEGIN
citra=# UPDATE inventory SET qty=qty+100 WHERE id=2 RETURNING *;
id | item | qty | ver
—-+——–+—–+—–
2 | ITEM_1 | 150 | 2
(1 row)UPDATE 1
citra=# COMMIT;

Finally, let’s simulate concurrent update protection. Besides current psql session, please open another session.
For simplicity, I assume that users on both session has queried the record and identify the ver equals 2. Please ensure that the ver field is included in update command.

session #1   session #2
citra=# BEGIN;   citra=# BEGIN;
citra=# UPDATE inventory SET qty=qty+100,ver=2 WHERE id=2 RETURNING *;   citra=# UPDATE inventory SET qty=qty+100,ver=2 WHERE id=2 RETURNING *;
citra=# COMMIT;   citra=# COMMIT;
    ERROR: Concurrent Update Protection!

The second session get the error message. The user must re-query the record to get the latest ver and use it in next update command.

Categories: PostgreSQL

Cross-tab Report

November 27, 2008 12 comments

Another case with aggregate function. A sales table is defined and contains raw records of sales activity as follows:

CREATE TABLE sales
(
  id serial NOT NULL,
  item text NOT NULL,
  sales_unit text NOT NULL,
  amount numeric NOT NULL,
  CONSTRAINT sales_pkey PRIMARY KEY (id)
)
WITH (OIDS=FALSE);

id item    sales_unit amount
1  ITEM_1  UNIT_1     1000.5
2  ITEM_1  UNIT_2     1100
3  ITEM_1  UNIT_3     1150.25
4  ITEM_1  UNIT_1     100.75
5  ITEM_2  UNIT_2     275
6  ITEM_2  UNIT_3     750.5
7  ITEM_2  UNIT_2     300.5

Total sales amount for each sales unit for each item will be presented in cross-tab format:

item    UNIT_1   UNIT_2   UNIT_3
ITEM_1  1101.25  1100     1150.25
ITEM_2  0        575.5    750.5

To create aggregate function, let us prepare the State Transition Function:

CREATE OR REPLACE FUNCTION sales_amount(
  amounts numeric[],
  unit text,
  amount numeric,
  units text,
  unitcount integer)
RETURNS numeric[] AS
$BODY$
DECLARE
  out_amounts numeric[];
  unit_pattern text;
  sub_unit text[];
  idx integer;
BEGIN
  out_amounts:=amounts;
  ——–units in format: [unit_name]:[array_index],[unit_name]:[array_index],…
  unit_pattern:= unit || ‘:[0-9]{1,}’;
  sub_unit:=regexp_matches(units,unit_pattern);
  IF sub_unit IS NULL THEN
    RETURN out_amounts;
  END IF;
  ——–get array_index for dept
  idx:=split_part(array_to_string(sub_unit,”),’:',2)::integer;
  IF array_upper(out_amounts,1) IS NULL THEN
  ——–initiate amount array with 0
    out_amounts:=string_to_array(
    ’0′ || repeat( ‘,0′, unitcount-1 ) ,
    ’,’ )::numeric[];
  END IF;
  out_amounts[idx]:=out_amounts[idx]+amount;
  RETURN out_amounts;
END;$BODY$
LANGUAGE ‘plpgsql’ IMMUTABLE STRICT;

Here is the aggregate function:

CREATE AGGREGATE sales_agg(text, numeric, text, integer) (
  SFUNC=sales_amount,
  STYPE=numeric[],
  INITCOND=’{}’
);

Now, let us create user defined function:

CREATE OR REPLACE FUNCTION create_sales_report(
  refcursor, –cursor name
  units text[] –array of your expected units to appear on corss-tab report
)
RETURNS void AS
$BODY$
DECLARE
  unit_count integer;
  units_series text;
  i integer;
  sql text;
BEGIN
  unit_count:=array_upper(units,1);
  ——–convert units into [dept_name]:[array_index],[dept_name]:[array_index],…
  ——–to avoid repetitive loops inside aggregate function
  units_series:=”;
  FOR i in 1..unit_count LOOP
    IF i>1 THEN
      units_series:= units_series || ‘,’;
    END IF;
  units_series:= units_series || units[i] || ‘:’ || i;
  END LOOP;
  ——–compose sql statement
  sql:=    ‘SELECT t.item’;
  i:=1;
  WHILE units[i] IS NOT NULL LOOP
    sql:= sql || ‘,t.amounts[' || i || '] AS ‘ || quote_ident(units[i]);
    i:=i+1;
  END LOOP;
  sql:=sql || ‘ FROM (SELECT item,’
    || ‘sales_agg(sales_unit,amount,’
    || quote_literal(units_series) || ‘,’
    || unit_count || ‘) AS amounts ‘
  || ‘FROM sales GROUP BY item’
  || ‘) t ORDER BY t.item’;
  ——–open cursor
  OPEN $1 FOR EXECUTE sql;
RETURN;
END;$BODY$
LANGUAGE ‘plpgsql’ IMMUTABLE STRICT;

Following is how the function is called in psql console:

citra=# begin;
BEGIN
citra=# select create_sales_report(‘report’,'{UNIT_1,UNIT_2,UNIT_3}’);
create_sales_report
———————

(1 row)

citra=# FETCH ALL IN report;
item  | UNIT_1  | UNIT_2 | UNIT_3
——–+———+——–+———
ITEM_1 | 1101.25 |   1100 | 1150.25
ITEM_2 |       0 |  575.5 |   750.5
(2 rows)

citra=# COMMIT;

The query can be expanded to have more sales units:

#select create_sales_report(‘report’,'{UNIT_1,UNIT_2,UNIT_3,…,…}’);

Or, to display all available sales units in the sales table, just use array_agg (see my previous post):

#select create_sales_report(‘report’,
  (SELECT array_agg(t.sales_unit) FROM (SELECT DISTINCT ON (sales_unit) sales_unit FROM sales) t)
);
Categories: PostgreSQL

array_agg for PostgreSQL version < 8.4

November 24, 2008 12 comments

In relation with article by Hubert Lubaczewski ‘Waiting for 8.4 – array aggregate and array unpacker’, I have the need to use the former feature for one of my project.

I have my aggregate function as follows:

CREATE AGGREGATE array_agg(anyelement) (
SFUNC=array_append,
STYPE=anyarray,
INITCOND=’{}’
);

The sample of table definition looks like:

CREATE TABLE tes
(
id integer NOT NULL,
grp integer NOT NULL,
label text NOT NULL,
CONSTRAINT tes_pkey PRIMARY KEY (id)
) WITH (OIDS=FALSE);

Populate table so it contains records:

id grp     label
1      1       ABC
2      1      DEF
3      2      GHI
4      2      JKL

Run query:
SELECT grp, array_agg(label) FROM tes GROUP BY grp;
Result:
grp array_agg
1 {ABC,DEF}
2 {GHI,JKL}

The array at the second column then -of course- can be converted to string:
SELECT grp,array_to_string( array_agg(label), ‘,’ ) FROM tes GROUP BY grp;
Result:
grp array_to_string
1 ABC,DEF
2 GHI,JKL

Categories: PostgreSQL
Follow

Get every new post delivered to your Inbox.