Maintain a custom PG to PG Connection With libpq’s COPY protocol

Enterprise PostgreSQL Solutions

Leave a Comment

Maintain a custom PG to PG Connection With libpq’s COPY protocol

1. Introduction

Recently in my development work, a custom connection is required to be maintained between a PG backend on primary and another PG backends on standby nodes to communicate custom data in addition to the existing walsender/walreceiver connection that streams WAL data. Of course, I could just create a new standalone backend and maintain a socket connection myself to communicate custom data. Technically, It could work, but it also created several problems. the persistence, user security, data encryption of this custom connection all need to be handled as well. So, why not just use libpq to handle all of these for us? Today, in this blog, I will share my experience with using libpq COPY protocol to maintain a custom connection for custom data based on PG14.

2. Create new routines in libpqwalreceiver.c

This file is located in src/backend/replication/libpqwalreceiver and is compiled as a shared library (.so) that contains routines related to libpq library that allows a PG backend to use libpq without having it compiled in the backend code. When a PG backend process needs to use libpq, it needs to load the shared library first using load_file() call.

My requirement is simple, the new routines I needed are connect, send, recv, similar to normal socket interactions. I do not have a close function defined here, because I would like the connection to be persisted as long as the primary and standby are running. When one of them exits, the connection will be automatically terminated once it has detected the peer has disconnected.

3. The Connect Routine

Unlike the libpqrcv_connect routine for replication, my case is much simpler. I just need my standby node to connect to the primary node, so I can simply reuse standby’s primary_conninfo configuration parameter to connect. This will trigger the primary node to fork a new backend process to serve this connection. A code snipper could look like this:

I also set my libpq socket connection to use blocking socket and set asyncStatus to be PGASYNC_COPY_BOTH to indicate that I will be doing a bidirectional data communication

4. The Send Routine

My send routine is exactly the same to the libpqrcv_send routine for replication. Both uses PQputCopyData to send streams of data out to the primary. Renamed it for consistency. Snippet below:

5. The Recv Routine

Also, very similar to libpqrcv_recv routine for replication, it shares almost exactly the same code. Except that for my requirement, the connection needs to be a synchronous connection. This means that my standby will block while waiting for primary to respond. In order to make recv synchronous, I had to pass a 0 to the third argument of PQgetCopyData. So, if you are okay with asynchronous connection, this routine could look exactly the same as libpqrcv_recv as well.

6. Having Standby to Send Out Some Custom Data

Now that we have the libpq wrapper routines made for our own purpose, we can then have the standby sends some custom data to the primary and waits for a response. Note that I am sending a letter ‘N’ followed by 3 example custom data, 100, 200, 300. Libpq COPY uses the letter d to indicate a COPY command, and what we are doing here is to wrap our own commands within the d command

    StringInfoData buf_blk_request;
    WalReceiverConn *wrconn;
    int len;

    load_file("libpqwalreceiver", false);

    wrconn = netbuf_connect("dbname=postgres  host=127.0.0.1 port=5550");

    initStringInfo(&buf_blk_request);
    pq_sendbyte(&buf_blk_request, 'N');
    pq_sendint32(&buf_blk_request, 100);
    pq_sendint32(&buf_blk_request, 200);
    pq_sendint32(&buf_blk_request, 300);
    pq_flush();

    /* Send it */
    netbuf_send(wrconn, buf_blk_request.data, buf_blk_request.len);

    /* Read the data */
    len = netbuf_recv(wrconn, &tmp, &fd);
    if (len > 0)
    {
        /*
         * Something was received from primary
         */
    }

7. Having Primary to Receive the Custom Data

When we send something using the methods above, the primary’s postmaster’s main loop will receive the data and decide what to do. Because we are using COPY protocol, the first character is d, in which src/backend/tcop/postgres.c already has a handler for that. So we will need to add additional code under the d handler in postgres.c to receive and process the data sent by standby and provide a response if needed.

    case 'd':           /* copy data */
        elog(DEBUG2, "copy data request received");
        int op;

        op = pq_getmsgbyte(&input_message);
        if (op == 'N')
        {
            StringInfoData buf_blk_reply;
            int data1, data2, data3;

            /* receive custom data here */
            data1 = pq_getmsgint(&input_message, 4);
            data2 = pq_getmsgint(&input_message, 4);
            data3 = pq_getmsgint(&input_message, 4);
            pq_getmsgend(&input_message);


            /* send another custom data back to standby here */
            pq_beginmessage(&buf_blk_reply, 'd');
            pq_sendint32(&buf_blk_request, 400);
            pq_sendint32(&buf_blk_request, 500);
            pq_sendint32(&buf_blk_request, 600);
            pq_endmessage(&buf_blk_reply);
            pq_flush();
        }
        break;

8. Summary

Based on libpq COPY, I have created a separate communication channel between a primary and standby node that can be used to communicate custom data similar to how you would normally handle a regular socket. All this is based on the COPY protocl that libpq already supports and within that protocol, we wrap our own data. In the above examples, when standby sends 100, 200, 300 to the primary, it is able to receive it and respond 400, 500, 600. This simple example can be expanded to support other things that you may need in your development. This way of using COPY for my own purpose may not be the cleanest way, but it is what works for me.

Leave a Reply

Your email address will not be published. Required fields are marked *