[Openais] corosync - CPG model_init + callback with totem ringid and members
Steven Dake
sdake at redhat.com
Mon Apr 19 11:23:20 PDT 2010
good for merge
On Thu, 2010-04-08 at 16:57 +0200, Jan Friesse wrote:
> Included is patch solving 2nd problem.
>
> In first problem, I agree with Chrissie, and really don't have any
> single idea how to make regular confchg precede totem_confchg.
>
> Christine Caulfield wrote:
> > On 07/04/10 20:32, David Teigland wrote:
> >> On Tue, Apr 06, 2010 at 02:05:00PM +0200, Jan Friesse wrote:
> >>> Same patch but rebased on top of Steve's change (today trunk).
> >>
> >> Thanks, this is mostly working well, but I've found one problem, and one
> >> additional thing I need (mentioned on irc already):
> >>
> >> 1. When a node joins, I get the totem callback before the corresponding
> >> confchg callback. When a node leaves I get them in the expected order:
> >> confchg followed by totem callback.
> >
> >
> > That *is* the expected order, as far as CPG is concerned anyway. The
> > process is node deemed to be a member of the group until all nodes have
> > seen its join message. it also makes more logical sense because the node
> > has to join the cluster before the process joins the group.
> >
> >
> >> 2. When my app starts up it needs to be able to get the current ring id,
> >> so we need to be able to get/force an initial totem callback after a
> >> cpg_join that indicates the current ring id.
> >>
> >>
> >> I've also had a problem getting the current sequence number through
> >> libcman/cman_get_cluster()/ci_generation ---
> >>
> >> On node 2 I see:
> >>
> >> in cman_dispatch statechange callback:
> >> call cman_get_cluster(), get generation 2124
> >> call cman_get_nodes(), see node 1 removed
> >>
> >> in cman_dispatch statechange callback:
> >> call cman_get_cluster(), get generation 2128
> >> call cman_get_nodes(), see node 1 added
> >>
> >> in cman_dispatch statechange callback:
> >> call cman_get_cluster(), get generation 2128 (expect 2132)
> >> call cman_get_nodes(), see node 1 removed
> >>
> >> in cman_dispatch statechange callback:
> >> call cman_get_cluster(), get generation 2136
> >> call cman_get_nodes(), see node 1 added
> >>
> >> The second time node 1 is removed I get the previous generation when
> >> node 1 was added instead of generation 2132 which the callback is for.
> >>
> >> On node 4 I do get generation 2132 in that callback as expected. So it
> >> seems like it could be a race, I've only gone through this test once.
> >>
> >
> > There is almost certainly a race there. The ring IDs need to be
> > delivered at the same time as the change notifications.
> >
>
> Chrissie,
> is that problem in cman or in my patch?
>
> > Chrissie
> >
>
> Regards,
> Honza
> plain text document attachment (2010-04-08-cpg_model+totem_cb.patch)
> commit 0d509f4bf23f618c940c3bcdd7cf0e97faf64876
> Author: Jan Friesse <jfriesse at redhat.com>
> Date: Thu Apr 8 16:48:45 2010 +0200
>
> CPG model_initialize and ringid + members callback
>
> Patch adds new function to initialize cpg, cpg_model_initialize. Model
> is set of callbacks. With this function, future addions of models
> should be possible without changing the ABI.
>
> Patch also contains callback in CPG_MODEL_V1 for notification about
> Totem membership changes.
>
> diff --git a/trunk/include/corosync/cpg.h b/trunk/include/corosync/cpg.h
> index b5609df..6189eb5 100644
> --- a/trunk/include/corosync/cpg.h
> +++ b/trunk/include/corosync/cpg.h
> @@ -78,6 +78,10 @@ typedef enum {
> CPG_ITERATION_ALL = 3,
> } cpg_iteration_type_t;
>
> +typedef enum {
> + CPG_MODEL_V1 = 1,
> +} cpg_model_t;
> +
> struct cpg_address {
> uint32_t nodeid;
> uint32_t pid;
> @@ -98,6 +102,11 @@ struct cpg_iteration_description_t {
> uint32_t pid;
> };
>
> +struct cpg_ring_id {
> + uint32_t nodeid;
> + uint64_t seq;
> +};
> +
> typedef void (*cpg_deliver_fn_t) (
> cpg_handle_t handle,
> const struct cpg_name *group_name,
> @@ -117,11 +126,32 @@ typedef void (*cpg_confchg_fn_t) (
> const struct cpg_address *left_list, size_t left_list_entries,
> const struct cpg_address *joined_list, size_t joined_list_entries);
>
> +typedef void (*cpg_totem_confchg_fn_t) (
> + cpg_handle_t handle,
> + struct cpg_ring_id ring_id,
> + uint32_t member_list_entries,
> + const uint32_t *member_list);
> +
> typedef struct {
> cpg_deliver_fn_t cpg_deliver_fn;
> cpg_confchg_fn_t cpg_confchg_fn;
> } cpg_callbacks_t;
>
> +typedef struct {
> + cpg_model_t model;
> +} cpg_model_data_t;
> +
> +#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF 0x01
> +
> +typedef struct {
> + cpg_model_t model;
> + cpg_deliver_fn_t cpg_deliver_fn;
> + cpg_confchg_fn_t cpg_confchg_fn;
> + cpg_totem_confchg_fn_t cpg_totem_confchg_fn;
> + unsigned int flags;
> +} cpg_model_v1_data_t;
> +
> +
> /** @} */
>
> /*
> @@ -132,6 +162,15 @@ cs_error_t cpg_initialize (
> cpg_callbacks_t *callbacks);
>
> /*
> + * Create a new cpg connection, initialize with model
> + */
> +cs_error_t cpg_model_initialize (
> + cpg_handle_t *handle,
> + cpg_model_t model,
> + cpg_model_data_t *model_data,
> + void *context);
> +
> +/*
> * Close the cpg handle
> */
> cs_error_t cpg_finalize (
> diff --git a/trunk/include/corosync/ipc_cpg.h b/trunk/include/corosync/ipc_cpg.h
> index 8f55ae8..a1ecabf 100644
> --- a/trunk/include/corosync/ipc_cpg.h
> +++ b/trunk/include/corosync/ipc_cpg.h
> @@ -65,6 +65,7 @@ enum res_cpg_types {
> MESSAGE_RES_CPG_ITERATIONNEXT = 10,
> MESSAGE_RES_CPG_ITERATIONFINALIZE = 11,
> MESSAGE_RES_CPG_FINALIZE = 12,
> + MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK = 13,
> };
>
> enum lib_cpg_confchg_reason {
> @@ -149,10 +150,24 @@ static inline void marshall_from_mar_cpg_iteration_description_t(
> marshall_from_mar_cpg_name_t (&dest->group, &src->group);
> };
>
> +typedef struct {
> + mar_uint32_t nodeid __attribute__((aligned(8)));
> + mar_uint64_t seq __attribute__((aligned(8)));
> +} mar_cpg_ring_id_t;
> +
> +static inline void marshall_from_mar_cpg_ring_id_t (
> + struct cpg_ring_id *dest,
> + const mar_cpg_ring_id_t *src)
> +{
> + dest->nodeid = src->nodeid;
> + dest->seq = src->seq;
> +}
> +
> struct req_lib_cpg_join {
> coroipc_request_header_t header __attribute__((aligned(8)));
> mar_cpg_name_t group_name __attribute__((aligned(8)));
> mar_uint32_t pid __attribute__((aligned(8)));
> + mar_uint32_t flags __attribute__((aligned(8)));
> };
>
> struct res_lib_cpg_join {
> @@ -238,6 +253,13 @@ struct res_lib_cpg_confchg_callback {
> // struct cpg_address joined_list[];
> };
>
> +struct res_lib_cpg_totem_confchg_callback {
> + coroipc_response_header_t header __attribute__((aligned(8)));
> + mar_cpg_ring_id_t ring_id __attribute__((aligned(8)));
> + mar_uint32_t member_list_entries __attribute__((aligned(8)));
> + mar_uint32_t member_list[];
> +};
> +
> struct req_lib_cpg_leave {
> coroipc_request_header_t header __attribute__((aligned(8)));
> mar_cpg_name_t group_name __attribute__((aligned(8)));
> diff --git a/trunk/lib/cpg.c b/trunk/lib/cpg.c
> index 6b58784..993a28a 100644
> --- a/trunk/lib/cpg.c
> +++ b/trunk/lib/cpg.c
> @@ -62,8 +62,11 @@
> struct cpg_inst {
> hdb_handle_t handle;
> int finalize;
> - cpg_callbacks_t callbacks;
> void *context;
> + union {
> + cpg_model_data_t model_data;
> + cpg_model_v1_data_t model_v1_data;
> + };
> struct list_head iteration_list_head;
> };
>
> @@ -118,9 +121,32 @@ cs_error_t cpg_initialize (
> cpg_handle_t *handle,
> cpg_callbacks_t *callbacks)
> {
> + cpg_model_v1_data_t model_v1_data;
> +
> + memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
> +
> + if (callbacks) {
> + model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
> + model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
> + }
> +
> + return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
> +}
> +
> +cs_error_t cpg_model_initialize (
> + cpg_handle_t *handle,
> + cpg_model_t model,
> + cpg_model_data_t *model_data,
> + void *context)
> +{
> cs_error_t error;
> struct cpg_inst *cpg_inst;
>
> + if (model != CPG_MODEL_V1) {
> + error = CPG_ERR_INVALID_PARAM;
> + goto error_no_destroy;
> + }
> +
> error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
> if (error != CS_OK) {
> goto error_no_destroy;
> @@ -142,10 +168,26 @@ cs_error_t cpg_initialize (
> goto error_put_destroy;
> }
>
> - if (callbacks) {
> - memcpy (&cpg_inst->callbacks, callbacks, sizeof (cpg_callbacks_t));
> + if (model_data != NULL) {
> + switch (model) {
> + case CPG_MODEL_V1:
> + memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
> + if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
> + error = CS_ERR_INVALID_PARAM;
> +
> + goto error_destroy;
> + }
> + break;
> + default:
> + error = CS_ERR_LIBRARY;
> + goto error_destroy;
> + break;
> + }
> }
>
> + cpg_inst->model_data.model = model;
> + cpg_inst->context = context;
> +
> list_init(&cpg_inst->iteration_list_head);
>
> hdb_handle_put (&cpg_handle_t_db, *handle);
> @@ -283,7 +325,8 @@ cs_error_t cpg_dispatch (
> struct cpg_inst *cpg_inst;
> struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
> struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
> - cpg_callbacks_t callbacks;
> + struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
> + struct cpg_inst cpg_inst_copy;
> coroipc_response_header_t *dispatch_data;
> struct cpg_address member_list[CPG_MEMBERS_MAX];
> struct cpg_address left_list[CPG_MEMBERS_MAX];
> @@ -292,6 +335,8 @@ cs_error_t cpg_dispatch (
> mar_cpg_address_t *left_list_start;
> mar_cpg_address_t *joined_list_start;
> unsigned int i;
> + struct cpg_ring_id ring_id;
> + uint32_t totem_member_list[CPG_MEMBERS_MAX];
>
> error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
> if (error != CS_OK) {
> @@ -332,74 +377,96 @@ cs_error_t cpg_dispatch (
> * A risk of this dispatch method is that the callback routines may
> * operate at the same time that cpgFinalize has been called.
> */
> - memcpy (&callbacks, &cpg_inst->callbacks, sizeof (cpg_callbacks_t));
> - /*
> - * Dispatch incoming message
> - */
> - switch (dispatch_data->id) {
> - case MESSAGE_RES_CPG_DELIVER_CALLBACK:
> - if (callbacks.cpg_deliver_fn == NULL) {
> + memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
> +
> + switch (cpg_inst_copy.model_data.model) {
> + case CPG_MODEL_V1:
> + /*
> + * Dispatch incoming message
> + */
> + switch (dispatch_data->id) {
> + case MESSAGE_RES_CPG_DELIVER_CALLBACK:
> + if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
> + break;
> + }
> +
> + res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
> +
> + marshall_from_mar_cpg_name_t (
> + &group_name,
> + &res_cpg_deliver_callback->group_name);
> +
> + cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
> + &group_name,
> + res_cpg_deliver_callback->nodeid,
> + res_cpg_deliver_callback->pid,
> + &res_cpg_deliver_callback->message,
> + res_cpg_deliver_callback->msglen);
> break;
> - }
> -
> - res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
>
> - marshall_from_mar_cpg_name_t (
> - &group_name,
> - &res_cpg_deliver_callback->group_name);
> + case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
> + if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
> + break;
> + }
> +
> + res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
> +
> + for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
> + marshall_from_mar_cpg_address_t (&member_list[i],
> + &res_cpg_confchg_callback->member_list[i]);
> + }
> + left_list_start = res_cpg_confchg_callback->member_list +
> + res_cpg_confchg_callback->member_list_entries;
> + for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
> + marshall_from_mar_cpg_address_t (&left_list[i],
> + &left_list_start[i]);
> + }
> + joined_list_start = res_cpg_confchg_callback->member_list +
> + res_cpg_confchg_callback->member_list_entries +
> + res_cpg_confchg_callback->left_list_entries;
> + for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
> + marshall_from_mar_cpg_address_t (&joined_list[i],
> + &joined_list_start[i]);
> + }
> + marshall_from_mar_cpg_name_t (
> + &group_name,
> + &res_cpg_confchg_callback->group_name);
> +
> + cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
> + &group_name,
> + member_list,
> + res_cpg_confchg_callback->member_list_entries,
> + left_list,
> + res_cpg_confchg_callback->left_list_entries,
> + joined_list,
> + res_cpg_confchg_callback->joined_list_entries);
>
> - callbacks.cpg_deliver_fn (handle,
> - &group_name,
> - res_cpg_deliver_callback->nodeid,
> - res_cpg_deliver_callback->pid,
> - &res_cpg_deliver_callback->message,
> - res_cpg_deliver_callback->msglen);
> - break;
> -
> - case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
> - if (callbacks.cpg_confchg_fn == NULL) {
> break;
> - }
> -
> - res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
> -
> - for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
> - marshall_from_mar_cpg_address_t (&member_list[i],
> - &res_cpg_confchg_callback->member_list[i]);
> - }
> - left_list_start = res_cpg_confchg_callback->member_list +
> - res_cpg_confchg_callback->member_list_entries;
> - for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
> - marshall_from_mar_cpg_address_t (&left_list[i],
> - &left_list_start[i]);
> - }
> - joined_list_start = res_cpg_confchg_callback->member_list +
> - res_cpg_confchg_callback->member_list_entries +
> - res_cpg_confchg_callback->left_list_entries;
> - for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
> - marshall_from_mar_cpg_address_t (&joined_list[i],
> - &joined_list_start[i]);
> - }
> - marshall_from_mar_cpg_name_t (
> - &group_name,
> - &res_cpg_confchg_callback->group_name);
> -
> - callbacks.cpg_confchg_fn (handle,
> - &group_name,
> - member_list,
> - res_cpg_confchg_callback->member_list_entries,
> - left_list,
> - res_cpg_confchg_callback->left_list_entries,
> - joined_list,
> - res_cpg_confchg_callback->joined_list_entries);
> - break;
> -
> - default:
> - coroipcc_dispatch_put (cpg_inst->handle);
> - error = CS_ERR_LIBRARY;
> - goto error_put;
> - break;
> - }
> + case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
> + if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
> + break;
> + }
> +
> + res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
> +
> + marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
> + for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
> + totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
> + }
> +
> + cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
> + ring_id,
> + res_cpg_totem_confchg_callback->member_list_entries,
> + totem_member_list);
> + break;
> + default:
> + coroipcc_dispatch_put (cpg_inst->handle);
> + error = CS_ERR_LIBRARY;
> + goto error_put;
> + break;
> + } /* - switch (dispatch_data->id) */
> + break; /* case CPG_MODEL_V1 */
> + } /* - switch (cpg_inst_copy.model_data.model) */
> coroipcc_dispatch_put (cpg_inst->handle);
>
> /*
> @@ -434,6 +501,14 @@ cs_error_t cpg_join (
> req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
> req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
> req_lib_cpg_join.pid = getpid();
> + req_lib_cpg_join.flags = 0;
> +
> + switch (cpg_inst->model_data.model) {
> + case CPG_MODEL_V1:
> + req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
> + break;
> + }
> +
> marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
> group);
>
> diff --git a/trunk/lib/libcpg.verso b/trunk/lib/libcpg.verso
> index 1454f6e..ee74734 100644
> --- a/trunk/lib/libcpg.verso
> +++ b/trunk/lib/libcpg.verso
> @@ -1 +1 @@
> -4.0.1
> +4.1.0
> diff --git a/trunk/man/Makefile.am b/trunk/man/Makefile.am
> index da01c2e..fe8f71b 100644
> --- a/trunk/man/Makefile.am
> +++ b/trunk/man/Makefile.am
> @@ -71,6 +71,7 @@ dist_man_MANS = \
> cpg_leave.3 \
> cpg_local_get.3 \
> cpg_mcast_joined.3 \
> + cpg_model_initialize.3 \
> cpg_zcb_mcast_joined.3 \
> cpg_zcb_alloc.3 \
> cpg_zcb_free.3 \
> diff --git a/trunk/man/cpg_initialize.3 b/trunk/man/cpg_initialize.3
> index ce6e25a..6d4bc51 100644
> --- a/trunk/man/cpg_initialize.3
> +++ b/trunk/man/cpg_initialize.3
> @@ -41,7 +41,10 @@ cpg_initialize \- Create a new connection to the CPG service
> .SH DESCRIPTION
> The
> .B cpg_initialize
> -function is used to initialize a connection to the closed process groups API.
> +function is used to initialize a connection to the closed process groups API. This function is deprecated
> +and
> +.B cpg_model_initialize
> +should be used in newly written code.
> .PP
> Each application may have several connections to the CPG API. Each application
> uses the
> @@ -167,5 +170,6 @@ The errors are undocumented.
> .BR cpg_context_get (3)
> .BR cpg_context_set (3)
> .BR cpg_local_get (3)
> +.BR cpg_model_initialize (3)
>
> .PP
> diff --git a/trunk/man/cpg_model_initialize.3 b/trunk/man/cpg_model_initialize.3
> new file mode 100644
> index 0000000..8ecf810
> --- /dev/null
> +++ b/trunk/man/cpg_model_initialize.3
> @@ -0,0 +1,227 @@
> +.\"/*
> +.\" * Copyright (c) 2010 Red Hat, Inc.
> +.\" *
> +.\" * All rights reserved.
> +.\" *
> +.\" * Author: Jan Friesse <jfriesse at redhat.com>
> +.\" * Author: Christine Caulfield <ccaulfie at redhat.com>
> +.\" *
> +.\" * This software licensed under BSD license, the text of which follows:
> +.\" *
> +.\" * Redistribution and use in source and binary forms, with or without
> +.\" * modification, are permitted provided that the following conditions are met:
> +.\" *
> +.\" * - Redistributions of source code must retain the above copyright notice,
> +.\" * this list of conditions and the following disclaimer.
> +.\" * - Redistributions in binary form must reproduce the above copyright notice,
> +.\" * this list of conditions and the following disclaimer in the documentation
> +.\" * and/or other materials provided with the distribution.
> +.\" * - Neither the name of the MontaVista Software, Inc. nor the names of its
> +.\" * contributors may be used to endorse or promote products derived from this
> +.\" * software without specific prior written permission.
> +.\" *
> +.\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
> +.\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
> +.\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
> +.\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
> +.\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
> +.\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> +.\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
> +.\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
> +.\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
> +.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
> +.\" * THE POSSIBILITY OF SUCH DAMAGE.
> +.\" */
> +.TH CPG_MODEL_INITIALIZE 3 2010-04-07 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
> +.SH NAME
> +cpg_model_initialize \- Create a new connection to the CPG service
> +.SH SYNOPSIS
> +.B #include <corosync/cpg.h>
> +.sp
> +.BI "cs_error_t cpg_model_initialize(cpg_handle_t *" handle ", cpg_model_t " model ", cpg_model_data_t *" model_data ", void *" context ");
> +
> +.SH DESCRIPTION
> +The
> +.B cpg_model_initialize
> +function is used to initialize a connection to the closed process groups API.
> +.PP
> +Each application may have several connections to the CPG API. Each application
> +uses the
> +.I handle
> +argument to uniquely identify the connection. The
> +.I handle
> +argument is then used in other function calls to identify the connection to be used
> +for communication with the CPG service.
> +.PP
> +Argument
> +.I model
> +is used to explicitly choose set of callbacks and internal parameters. Currently only model
> +.I CPG_MODEL_V1
> +is defined.
> +.PP
> +Callbacks and internal parameters are passed by
> +.I model_data
> +argument. This is casted pointer (idea is similar as in sockaddr function) to one of structures
> +corresponding to chosen model. Currently only
> +.I cpg_model_v1_data_t
> +is needed.
> +.SH MODEL_V1
> +The
> +.I MODEL_V1
> +is backwards compatible with original callbacks initialized by
> +.I cpg_initialize
> +but new callback
> +.I cpg_totem_confchg_fn
> +is defined.
> +.PP
> +Every time an CPG event occurs within the joined group, one of the callbacks specified by the argument
> +.I callbacks
> +is called. The callback functions are described by the following type definitions:
> +.PP
> +.PP
> +.IP
> +.RS
> +.ne 18
> +.nf
> +.ta 4n 20n 32n
> +
> +typedef void (*cpg_deliver_fn_t) (
> + cpg_handle_t handle,
> + const struct cpg_name *group_name,
> + uint32_t nodeid,
> + uint32_t pid,
> + const void *msg,
> + size_t msg_len);
> +
> +
> +typedef void (*cpg_confchg_fn_t) (
> + cpg_handle_t handle,
> + const struct cpg_name *group_name,
> + const struct cpg_address *member_list, size_t member_list_entries,
> + const struct cpg_address *left_list, size_t left_list_entries,
> + const struct cpg_address *joined_list, size_t joined_list_entries);
> +
> +
> +typedef void (*cpg_totem_confchg_fn_t) (
> + cpg_handle_t handle,
> + struct cpg_ring_id ring_id,
> + uint32_t member_list_entries,
> + const uint32_t *member_list);
> +.ta
> +.fi
> +.RE
> +.IP
> +.PP
> +.PP
> +The
> +.I cpg_model_v1_data_t
> +structure is defined as:
> +.IP
> +.RS
> +.ne 18
> +.nf
> +.PP
> +typedef struct {
> + cpg_model_t model;
> + cpg_deliver_fn_t cpg_deliver_fn;
> + cpg_confchg_fn_t cpg_confchg_fn;
> + cpg_totem_confchg_fn_t cpg_totem_confchg_fn;
> +} cpg_model_v1_data_t;
> +.ta
> +.fi
> +.RE
> +.IP
> +.PP
> +When a configuration change occurs or a message is to be delivered one of the callbacks
> +is called from the
> +.B cpg_dispatch()
> +function. If a configuration change occurs,
> +.I cpg_confchg_fn
> +is called. If a delivery of a message occurs,
> +.I cpg_deliver_fn
> +is called.
> +When totem membership change occurs,
> +.I cpg_totem_confchg_fn
> +is called.
> +The
> +.I cpg_address
> +structure is defined
> +.IP
> +.RS
> +.ne 18
> +.nf
> +.PP
> +struct cpg_address {
> + unsigned int nodeid;
> + unsigned int pid;
> + unsigned int reason;
> +};
> +.ta
> +.fi
> +.RE
> +.IP
> +.PP
> +where nodeid is a 32 bit unique node identifier, pid is the process ID of the process that has joined/left the group
> +or sent the message, and reason is an integer code indicating why the node joined/left the group.
> +.PP
> +.IP
> +.RS
> +.ne 18
> +.nf
> +.PP
> +CPG_REASON_JOIN - the process joined a group using cpg_join().
> +CPG_REASON_LEAVE - the process left a group using cpg_leave()
> +CPG_REASON_NODEDOWN - the process left a group because the node left the cluster.
> +CPG_REASON_NODEUP - the process joined a group because it was already a member of a group on a node that has just joined the cluster
> +CPG_REASON_PROCDOWN - the process left a group without calling cpg_leave()
> +.ta
> +.fi
> +.RE
> +.IP
> +.PP
> +The
> +.I cpg_ring_id
> +structure is defined
> +.IP
> +.RS
> +.ne 18
> +.nf
> +.PP
> +struct cpg_ring_id {
> + uint32_t nodeid;
> + uint64_t seq;
> +};
> +.ta
> +.fi
> +.RE
> +.IP
> +.PP
> +where
> +.I nodeid
> +is if of node of current Totem leader and seq is increasing number.
> +
> +.PP
> +.SH RETURN VALUE
> +This call returns the CPG_OK value if successful, otherwise an error is returned.
> +.PP
> +.SH ERRORS
> +The errors are undocumented.
> +.SH "SEE ALSO"
> +.BR cpg_overview (8),
> +.BR cpg_initialize (3),
> +.BR cpg_finalize (3),
> +.BR cpg_fd_get (3),
> +.BR cpg_dispatch (3),
> +.BR cpg_join (3),
> +.BR cpg_leave (3),
> +.BR cpg_mcast_joined (3),
> +.BR cpg_membership_get (3)
> +.BR cpg_zcb_alloc (3)
> +.BR cpg_zcb_free (3)
> +.BR cpg_zcb_mcast_joined (3)
> +.BR cpg_context_get (3)
> +.BR cpg_context_set (3)
> +.BR cpg_local_get (3)
> +.BR cpg_model_initialize (3)
> +
> +.PP
> diff --git a/trunk/man/cpg_overview.8 b/trunk/man/cpg_overview.8
> index 84d4c3b..1f268a5 100644
> --- a/trunk/man/cpg_overview.8
> +++ b/trunk/man/cpg_overview.8
> @@ -61,6 +61,7 @@ access the corosync services.
> .BR cpg_join (3),
> .BR cpg_leave (3),
> .BR cpg_mcast_joined (3),
> +.BR cpg_model_initialize (3),
> .BR cpg_membership_get (3)
> .BR cpg_zcb_alloc (3)
> .BR cpg_zcb_free (3)
> diff --git a/trunk/services/cpg.c b/trunk/services/cpg.c
> index ede426f..829767a 100644
> --- a/trunk/services/cpg.c
> +++ b/trunk/services/cpg.c
> @@ -133,6 +133,8 @@ struct cpg_pd {
> mar_cpg_name_t group_name;
> uint32_t pid;
> enum cpd_state cpd_state;
> + unsigned int flags;
> + int initial_totem_conf_sent;
> struct list_head list;
> struct list_head iteration_instance_list_head;
> };
> @@ -160,6 +162,8 @@ static struct corosync_api_v1 *api = NULL;
>
> static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
>
> +static mar_cpg_ring_id_t last_sync_ring_id;
> +
> struct process_info {
> unsigned int nodeid;
> uint32_t pid;
> @@ -255,6 +259,11 @@ static void cpg_sync_activate (void);
>
> static void cpg_sync_abort (void);
>
> +static int notify_lib_totem_membership (
> + void *conn,
> + int member_list_entries,
> + const unsigned int *member_list);
> +
> /*
> * Library Handler Definition
> */
> @@ -432,6 +441,9 @@ static void cpg_sync_init_v2 (
> sizeof (unsigned int));
> my_member_list_entries = member_list_entries;
>
> + last_sync_ring_id.nodeid = ring_id->rep.nodeid;
> + last_sync_ring_id.seq = ring_id->seq;
> +
> for (i = 0; i < my_member_list_entries; i++) {
> if (my_member_list[i] < lowest_nodeid) {
> lowest_nodeid = my_member_list[i];
> @@ -482,13 +494,50 @@ static void cpg_sync_activate (void)
> memcpy (my_old_member_list, my_member_list,
> my_member_list_entries * sizeof (unsigned int));
> my_old_member_list_entries = my_member_list_entries;
> +
> + notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
> }
>
> static void cpg_sync_abort (void)
> {
> }
>
> +static int notify_lib_totem_membership (
> + void *conn,
> + int member_list_entries,
> + const unsigned int *member_list)
> +{
> + struct list_head *iter;
> + char *buf;
> + int size;
> + struct res_lib_cpg_totem_confchg_callback *res;
> +
> + size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
> + sizeof(mar_uint32_t) * (member_list_entries);
> + buf = alloca(size);
> + if (!buf)
> + return CPG_ERR_LIBRARY;
>
> + res = (struct res_lib_cpg_totem_confchg_callback *)buf;
> + res->member_list_entries = member_list_entries;
> + res->header.size = size;
> + res->header.id = MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK;
> + res->header.error = CS_OK;
> +
> + memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
> + memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
> +
> + if (conn == NULL) {
> + for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
> + struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list);
> + api->ipc_dispatch_send (cpg_pd->conn, buf, size);
> + }
> + } else {
> + api->ipc_dispatch_send (conn, buf, size);
> + }
> +
> + return CPG_OK;
> +}
>
> static int notify_lib_joinlist(
> const mar_cpg_name_t *group_name,
> @@ -604,6 +653,20 @@ static int notify_lib_joinlist(
> }
> }
>
> +
> + /*
> + * Traverse thru cpds and send totem membership for cpd, where it is not send yet
> + */
> + for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
> + struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
> +
> + if ((cpd->flags & CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF) && (cpd->initial_totem_conf_sent == 0)) {
> + cpd->initial_totem_conf_sent = 1;
> +
> + notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
> + }
> + }
> +
> return CPG_OK;
> }
>
> @@ -1093,6 +1156,7 @@ static void message_handler_req_lib_cpg_join (void *conn, const void *message)
> error = CPG_OK;
> cpd->cpd_state = CPD_STATE_JOIN_STARTED;
> cpd->pid = req_lib_cpg_join->pid;
> + cpd->flags = req_lib_cpg_join->flags;
> memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
> sizeof (cpd->group_name));
>
> diff --git a/trunk/test/testcpg.c b/trunk/test/testcpg.c
> index 2abe83d..c08406b 100644
> --- a/trunk/test/testcpg.c
> +++ b/trunk/test/testcpg.c
> @@ -132,9 +132,29 @@ static void ConfchgCallback (
> }
> }
>
> -static cpg_callbacks_t callbacks = {
> +static void TotemConfchgCallback (
> + cpg_handle_t handle,
> + struct cpg_ring_id ring_id,
> + uint32_t member_list_entries,
> + const uint32_t *member_list)
> +{
> + int i;
> +
> + printf("\nTotemConfchgCallback: ringid (%u.%llu)\n", ring_id.nodeid, ring_id.seq);
> +
> + printf("active processors %lu: ",
> + (unsigned long int) member_list_entries);
> + for (i=0; i<member_list_entries; i++) {
> + printf("%d ", member_list[i]);
> + }
> + printf ("\n");
> +}
> +
> +static cpg_model_v1_data_t callbacks = {
> .cpg_deliver_fn = DeliverCallback,
> .cpg_confchg_fn = ConfchgCallback,
> + .cpg_totem_confchg_fn = TotemConfchgCallback,
> + .flags = CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF,
> };
>
> static void sigintr_handler (int signum) __attribute__((__noreturn__));
> @@ -170,7 +190,7 @@ int main (int argc, char *argv[]) {
> group_name.length = 6;
> }
>
> - result = cpg_initialize (&handle, &callbacks);
> + result = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&callbacks, NULL);
> if (result != CS_OK) {
> printf ("Could not initialize Cluster Process Group API instance error %d\n", result);
> exit (1);
> _______________________________________________
> Openais mailing list
> Openais at lists.linux-foundation.org
> https://lists.linux-foundation.org/mailman/listinfo/openais
More information about the Openais
mailing list