Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions collector.c
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,10 @@ pgws_collector_main(Datum main_arg)
/* Handle request if any */
if (pgws_collector_hdr->request != NO_REQUEST)
{
LOCKTAG tag;
SHMRequest request;

pgws_init_lock_tag(&tag, PGWS_COLLECTOR_LOCK);
LWLockAcquire(collector_lock, LW_EXCLUSIVE);

LockAcquire(&tag, ExclusiveLock, false, false);
request = pgws_collector_hdr->request;
pgws_collector_hdr->request = NO_REQUEST;

Expand Down Expand Up @@ -475,7 +473,7 @@ pgws_collector_main(Datum main_arg)
hash_destroy(profile_hash);
profile_hash = make_profile_hash();
}
LockRelease(&tag, ExclusiveLock, false);
LWLockRelease(collector_lock);
}
}

Expand Down
59 changes: 26 additions & 33 deletions pg_wait_sampling.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ shm_mq *pgws_collector_mq = NULL;
uint64 *pgws_proc_queryids = NULL;
CollectorShmqHeader *pgws_collector_hdr = NULL;

/* Receiver (backend) local shm_mq pointers and lock */
/* Receiver (backend) local shm_mq pointers */
static shm_mq *recv_mq = NULL;
static shm_mq_handle *recv_mqh = NULL;
static LOCKTAG queueTag;

/* LWLock pointers */
LWLock *queue_lock;
LWLock *collector_lock;

/* Hook functions */
#if PG_VERSION_NUM >= 150000
Expand Down Expand Up @@ -239,6 +242,10 @@ pgws_shmem_request(void)
prev_shmem_request_hook();

RequestAddinShmemSpace(pgws_shmem_size());

/* We request two different LWLock Tranches for ease of use */
RequestNamedLWLockTranche(PGWS_QUEUE_LOCK_NAME, 1);
RequestNamedLWLockTranche(PGWS_COLLECTOR_LOCK_NAME, 1);
}
#endif

Expand All @@ -258,6 +265,9 @@ pgws_shmem_startup(void)
if (!found)
{
/* Create shared objects */
queue_lock = &(GetNamedLWLockTranche(PGWS_QUEUE_LOCK_NAME))->lock;
collector_lock = &(GetNamedLWLockTranche(PGWS_COLLECTOR_LOCK_NAME))->lock;

toc = shm_toc_create(PG_WAIT_SAMPLING_MAGIC, pgws, segsize);

pgws_collector_hdr = shm_toc_allocate(toc, sizeof(CollectorShmqHeader));
Expand Down Expand Up @@ -302,7 +312,7 @@ pgws_cleanup_callback(int code, Datum arg)
{
elog(DEBUG3, "pg_wait_sampling cleanup: detaching shm_mq and releasing queue lock");
shm_mq_detach(recv_mqh);
LockRelease(&queueTag, ExclusiveLock, false);
LWLockRelease(queue_lock);
}

/*
Expand All @@ -322,9 +332,13 @@ _PG_init(void)
* resources in pgws_shmem_startup().
*
* If you change code here, don't forget to also report the modifications
* in pgsp_shmem_request() for pg15 and later.
* in pgws_shmem_request() for pg15 and later.
*/
RequestAddinShmemSpace(pgws_shmem_size());

/* We request two different LWLock Tranches for ease of use */
RequestNamedLWLockTranche(PGWS_QUEUE_LOCK_NAME, 1);
RequestNamedLWLockTranche(PGWS_COLLECTOR_LOCK_NAME, 1);
#endif

pgws_register_wait_collector();
Expand Down Expand Up @@ -619,22 +633,10 @@ typedef struct
ProfileItem *items;
} Profile;

void
pgws_init_lock_tag(LOCKTAG *tag, uint32 lock)
{
tag->locktag_field1 = PG_WAIT_SAMPLING_MAGIC;
tag->locktag_field2 = lock;
tag->locktag_field3 = 0;
tag->locktag_field4 = 0;
tag->locktag_type = LOCKTAG_USERLOCK;
tag->locktag_lockmethodid = USER_LOCKMETHOD;
}

/* Get array (history or profile data) from shared memory */
static void *
receive_array(SHMRequest request, Size item_size, Size *count)
{
LOCKTAG collectorTag;
shm_mq_result res;
Size len,
i;
Expand All @@ -644,14 +646,11 @@ receive_array(SHMRequest request, Size item_size, Size *count)
MemoryContext oldctx;

/* Ensure nobody else trying to send request to queue */
pgws_init_lock_tag(&queueTag, PGWS_QUEUE_LOCK);
LockAcquire(&queueTag, ExclusiveLock, false, false);

pgws_init_lock_tag(&collectorTag, PGWS_COLLECTOR_LOCK);
LockAcquire(&collectorTag, ExclusiveLock, false, false);
LWLockAcquire(queue_lock, LW_EXCLUSIVE);
LWLockAcquire(collector_lock, LW_EXCLUSIVE);
recv_mq = shm_mq_create(pgws_collector_mq, COLLECTOR_QUEUE_SIZE);
pgws_collector_hdr->request = request;
LockRelease(&collectorTag, ExclusiveLock, false);
LWLockRelease(collector_lock);

/*
* Check that the collector was started to avoid NULL
Expand Down Expand Up @@ -711,7 +710,7 @@ receive_array(SHMRequest request, Size item_size, Size *count)

/* We still have to detach and release lock during normal operation. */
shm_mq_detach(recv_mqh);
LockRelease(&queueTag, ExclusiveLock, false);
LWLockRelease(queue_lock);

return result;
}
Expand Down Expand Up @@ -814,18 +813,12 @@ PG_FUNCTION_INFO_V1(pg_wait_sampling_reset_profile);
Datum
pg_wait_sampling_reset_profile(PG_FUNCTION_ARGS)
{
LOCKTAG collectorTag;

check_shmem();

pgws_init_lock_tag(&queueTag, PGWS_QUEUE_LOCK);

LockAcquire(&queueTag, ExclusiveLock, false, false);

pgws_init_lock_tag(&collectorTag, PGWS_COLLECTOR_LOCK);
LockAcquire(&collectorTag, ExclusiveLock, false, false);
LWLockAcquire(queue_lock, LW_EXCLUSIVE);
LWLockAcquire(collector_lock, LW_EXCLUSIVE);
pgws_collector_hdr->request = PROFILE_RESET;
LockRelease(&collectorTag, ExclusiveLock, false);
LWLockRelease(collector_lock);

/*
* Check that the collector was started to avoid NULL
Expand All @@ -837,7 +830,7 @@ pg_wait_sampling_reset_profile(PG_FUNCTION_ARGS)

SetLatch(pgws_collector_hdr->latch);

LockRelease(&queueTag, ExclusiveLock, false);
LWLockRelease(queue_lock);

PG_RETURN_VOID();
}
Expand Down
11 changes: 7 additions & 4 deletions pg_wait_sampling.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
#include "datatype/timestamp.h"
#include "storage/latch.h"
#include "storage/lock.h"
#include "storage/lwlock.h"
#include "storage/shm_mq.h"

#define PG_WAIT_SAMPLING_MAGIC 0xCA94B107
#define COLLECTOR_QUEUE_SIZE (16 * 1024)
#define HISTORY_TIME_MULTIPLIER 10
#define PGWS_QUEUE_LOCK 0
#define PGWS_COLLECTOR_LOCK 1
#define PGWS_QUEUE_LOCK_NAME "pgws_queue_lock"
#define PGWS_COLLECTOR_LOCK_NAME "pgws_collector_lock"

typedef struct
{
Expand Down Expand Up @@ -68,10 +69,12 @@ extern int pgws_profileQueries;
extern bool pgws_sampleCpu;

/* pg_wait_sampling.c */
extern CollectorShmqHeader *pgws_collector_hdr;
extern shm_mq *pgws_collector_mq;
extern uint64 *pgws_proc_queryids;
extern void pgws_init_lock_tag(LOCKTAG *tag, uint32 lock);
extern CollectorShmqHeader *pgws_collector_hdr;

extern LWLock *collector_lock;

extern bool pgws_should_sample_proc(PGPROC *proc, int *pid_p, uint32 *wait_event_info_p);

/* collector.c */
Expand Down