[Openais] corosync - CPG model_init + callback with totem ringid and members

Steven Dake sdake at redhat.com
Mon Apr 19 11:23:20 PDT 2010


good for merge

On Thu, 2010-04-08 at 16:57 +0200, Jan Friesse wrote:
> Included is patch solving 2nd problem.
> 
> In first problem, I agree with Chrissie, and really don't have any
> single idea how to make regular confchg precede totem_confchg.
> 
> Christine Caulfield wrote:
> > On 07/04/10 20:32, David Teigland wrote:
> >> On Tue, Apr 06, 2010 at 02:05:00PM +0200, Jan Friesse wrote:
> >>> Same patch but rebased on top of Steve's change (today trunk).
> >>
> >> Thanks, this is mostly working well, but I've found one problem, and one
> >> additional thing I need (mentioned on irc already):
> >>
> >> 1. When a node joins, I get the totem callback before the corresponding
> >> confchg callback.  When a node leaves I get them in the expected order:
> >> confchg followed by totem callback.
> > 
> > 
> > That *is* the expected order, as far as CPG is concerned anyway. The
> > process is node deemed to be a member of the group until all nodes have
> > seen its join message. it also makes more logical sense because the node
> > has to join the cluster before the process joins the group.
> > 
> > 
> >> 2. When my app starts up it needs to be able to get the current ring id,
> >> so we need to be able to get/force an initial totem callback after a
> >> cpg_join that indicates the current ring id.
> >>
> >>
> >> I've also had a problem getting the current sequence number through
> >> libcman/cman_get_cluster()/ci_generation ---
> >>
> >> On node 2 I see:
> >>
> >> in cman_dispatch statechange callback:
> >>    call cman_get_cluster(), get generation 2124
> >>    call cman_get_nodes(), see node 1 removed
> >>
> >> in cman_dispatch statechange callback:
> >>    call cman_get_cluster(), get generation 2128
> >>    call cman_get_nodes(), see node 1 added
> >>
> >> in cman_dispatch statechange callback:
> >>    call cman_get_cluster(), get generation 2128 (expect 2132)
> >>    call cman_get_nodes(), see node 1 removed
> >>
> >> in cman_dispatch statechange callback:
> >>    call cman_get_cluster(), get generation 2136
> >>    call cman_get_nodes(), see node 1 added
> >>
> >> The second time node 1 is removed I get the previous generation when
> >> node 1 was added instead of generation 2132 which the callback is for.
> >>
> >> On node 4 I do get generation 2132 in that callback as expected.  So it
> >> seems like it could be a race, I've only gone through this test once.
> >>
> > 
> > There is almost certainly a race there. The ring IDs need to be
> > delivered at the same time as the change notifications.
> > 
> 
> Chrissie,
> is that problem in cman or in my patch?
> 
> > Chrissie
> > 
> 
> Regards,
>   Honza
> plain text document attachment (2010-04-08-cpg_model+totem_cb.patch)
> commit 0d509f4bf23f618c940c3bcdd7cf0e97faf64876
> Author: Jan Friesse <jfriesse at redhat.com>
> Date:   Thu Apr 8 16:48:45 2010 +0200
> 
>     CPG model_initialize and ringid + members callback
>     
>     Patch adds new function to initialize cpg, cpg_model_initialize. Model
>     is set of callbacks. With this function, future addions of models
>     should  be possible without changing the ABI.
>     
>     Patch also contains callback in CPG_MODEL_V1 for notification about
>     Totem membership changes.
> 
> diff --git a/trunk/include/corosync/cpg.h b/trunk/include/corosync/cpg.h
> index b5609df..6189eb5 100644
> --- a/trunk/include/corosync/cpg.h
> +++ b/trunk/include/corosync/cpg.h
> @@ -78,6 +78,10 @@ typedef enum {
>  	CPG_ITERATION_ALL = 3,
>  } cpg_iteration_type_t;
>  
> +typedef enum {
> +	CPG_MODEL_V1 = 1,
> +} cpg_model_t;
> +
>  struct cpg_address {
>  	uint32_t nodeid;
>  	uint32_t pid;
> @@ -98,6 +102,11 @@ struct cpg_iteration_description_t {
>  	uint32_t pid;
>  };
>  
> +struct cpg_ring_id {
> +	uint32_t nodeid;
> +	uint64_t seq;
> +};
> +
>  typedef void (*cpg_deliver_fn_t) (
>  	cpg_handle_t handle,
>  	const struct cpg_name *group_name,
> @@ -117,11 +126,32 @@ typedef void (*cpg_confchg_fn_t) (
>  	const struct cpg_address *left_list, size_t left_list_entries,
>  	const struct cpg_address *joined_list, size_t joined_list_entries);
>  
> +typedef void (*cpg_totem_confchg_fn_t) (
> +	cpg_handle_t handle,
> +	struct cpg_ring_id ring_id,
> +	uint32_t member_list_entries,
> +	const uint32_t *member_list);
> +
>  typedef struct {
>  	cpg_deliver_fn_t cpg_deliver_fn;
>  	cpg_confchg_fn_t cpg_confchg_fn;
>  } cpg_callbacks_t;
>  
> +typedef struct {
> +	cpg_model_t model;
> +} cpg_model_data_t;
> +
> +#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF 0x01
> +
> +typedef struct {
> +	cpg_model_t model;
> +	cpg_deliver_fn_t cpg_deliver_fn;
> +	cpg_confchg_fn_t cpg_confchg_fn;
> +	cpg_totem_confchg_fn_t cpg_totem_confchg_fn;
> +	unsigned int flags;
> +} cpg_model_v1_data_t;
> +
> +
>  /** @} */
>  
>  /*
> @@ -132,6 +162,15 @@ cs_error_t cpg_initialize (
>  	cpg_callbacks_t *callbacks);
>  
>  /*
> + * Create a new cpg connection, initialize with model
> + */
> +cs_error_t cpg_model_initialize (
> +	cpg_handle_t *handle,
> +	cpg_model_t model,
> +	cpg_model_data_t *model_data,
> +	void *context);
> +
> +/*
>   * Close the cpg handle
>   */
>  cs_error_t cpg_finalize (
> diff --git a/trunk/include/corosync/ipc_cpg.h b/trunk/include/corosync/ipc_cpg.h
> index 8f55ae8..a1ecabf 100644
> --- a/trunk/include/corosync/ipc_cpg.h
> +++ b/trunk/include/corosync/ipc_cpg.h
> @@ -65,6 +65,7 @@ enum res_cpg_types {
>  	MESSAGE_RES_CPG_ITERATIONNEXT = 10,
>  	MESSAGE_RES_CPG_ITERATIONFINALIZE = 11,
>  	MESSAGE_RES_CPG_FINALIZE = 12,
> +	MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK = 13,
>  };
>  
>  enum lib_cpg_confchg_reason {
> @@ -149,10 +150,24 @@ static inline void marshall_from_mar_cpg_iteration_description_t(
>  	marshall_from_mar_cpg_name_t (&dest->group, &src->group);
>  };
>  
> +typedef struct {
> +        mar_uint32_t nodeid __attribute__((aligned(8)));
> +        mar_uint64_t seq __attribute__((aligned(8)));
> +} mar_cpg_ring_id_t;
> +
> +static inline void marshall_from_mar_cpg_ring_id_t (
> +	struct cpg_ring_id *dest,
> +	const mar_cpg_ring_id_t *src)
> +{
> +	dest->nodeid = src->nodeid;
> +	dest->seq = src->seq;
> +}
> +
>  struct req_lib_cpg_join {
>  	coroipc_request_header_t header __attribute__((aligned(8)));
>  	mar_cpg_name_t group_name __attribute__((aligned(8)));
>  	mar_uint32_t pid __attribute__((aligned(8)));
> +	mar_uint32_t flags __attribute__((aligned(8)));
>  };
>  
>  struct res_lib_cpg_join {
> @@ -238,6 +253,13 @@ struct res_lib_cpg_confchg_callback {
>  //	struct cpg_address joined_list[];
>  };
>  
> +struct res_lib_cpg_totem_confchg_callback {
> +	coroipc_response_header_t header __attribute__((aligned(8)));
> +	mar_cpg_ring_id_t ring_id __attribute__((aligned(8)));
> +	mar_uint32_t member_list_entries __attribute__((aligned(8)));
> +	mar_uint32_t member_list[];
> +};
> +
>  struct req_lib_cpg_leave {
>  	coroipc_request_header_t header __attribute__((aligned(8)));
>  	mar_cpg_name_t group_name __attribute__((aligned(8)));
> diff --git a/trunk/lib/cpg.c b/trunk/lib/cpg.c
> index 6b58784..993a28a 100644
> --- a/trunk/lib/cpg.c
> +++ b/trunk/lib/cpg.c
> @@ -62,8 +62,11 @@
>  struct cpg_inst {
>  	hdb_handle_t handle;
>  	int finalize;
> -	cpg_callbacks_t callbacks;
>  	void *context;
> +	union {
> +		cpg_model_data_t model_data;
> +		cpg_model_v1_data_t model_v1_data;
> +	};
>  	struct list_head iteration_list_head;
>  };
>  
> @@ -118,9 +121,32 @@ cs_error_t cpg_initialize (
>  	cpg_handle_t *handle,
>  	cpg_callbacks_t *callbacks)
>  {
> +	cpg_model_v1_data_t model_v1_data;
> +
> +	memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
> +
> +	if (callbacks) {
> +		model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
> +		model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
> +	}
> +
> +	return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
> +}
> +
> +cs_error_t cpg_model_initialize (
> +	cpg_handle_t *handle,
> +	cpg_model_t model,
> +	cpg_model_data_t *model_data,
> +	void *context)
> +{
>  	cs_error_t error;
>  	struct cpg_inst *cpg_inst;
>  
> +	if (model != CPG_MODEL_V1) {
> +		error = CPG_ERR_INVALID_PARAM;
> +		goto error_no_destroy;
> +	}
> +
>  	error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
>  	if (error != CS_OK) {
>  		goto error_no_destroy;
> @@ -142,10 +168,26 @@ cs_error_t cpg_initialize (
>  		goto error_put_destroy;
>  	}
>  
> -	if (callbacks) {
> -		memcpy (&cpg_inst->callbacks, callbacks, sizeof (cpg_callbacks_t));
> +	if (model_data != NULL) {
> +		switch (model) {
> +		case CPG_MODEL_V1:
> +			memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
> +			if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
> +				error = CS_ERR_INVALID_PARAM;
> +
> +				goto error_destroy;
> +			}
> +			break;
> +		default:
> +			error = CS_ERR_LIBRARY;
> +			goto error_destroy;
> +			break;
> +		}
>  	}
>  
> +	cpg_inst->model_data.model = model;
> +	cpg_inst->context = context;
> +
>  	list_init(&cpg_inst->iteration_list_head);
>  
>  	hdb_handle_put (&cpg_handle_t_db, *handle);
> @@ -283,7 +325,8 @@ cs_error_t cpg_dispatch (
>  	struct cpg_inst *cpg_inst;
>  	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
>  	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
> -	cpg_callbacks_t callbacks;
> +	struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
> +	struct cpg_inst cpg_inst_copy;
>  	coroipc_response_header_t *dispatch_data;
>  	struct cpg_address member_list[CPG_MEMBERS_MAX];
>  	struct cpg_address left_list[CPG_MEMBERS_MAX];
> @@ -292,6 +335,8 @@ cs_error_t cpg_dispatch (
>  	mar_cpg_address_t *left_list_start;
>  	mar_cpg_address_t *joined_list_start;
>  	unsigned int i;
> +	struct cpg_ring_id ring_id;
> +	uint32_t totem_member_list[CPG_MEMBERS_MAX];
>  
>  	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
>  	if (error != CS_OK) {
> @@ -332,74 +377,96 @@ cs_error_t cpg_dispatch (
>  		 * A risk of this dispatch method is that the callback routines may
>  		 * operate at the same time that cpgFinalize has been called.
>  		 */
> -		memcpy (&callbacks, &cpg_inst->callbacks, sizeof (cpg_callbacks_t));
> -		/*
> -		 * Dispatch incoming message
> -		 */
> -		switch (dispatch_data->id) {
> -		case MESSAGE_RES_CPG_DELIVER_CALLBACK:
> -			if (callbacks.cpg_deliver_fn == NULL) {
> +		memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
> +
> +		switch (cpg_inst_copy.model_data.model) {
> +		case CPG_MODEL_V1:
> +			/*
> +			 * Dispatch incoming message
> +			 */
> +			switch (dispatch_data->id) {
> +			case MESSAGE_RES_CPG_DELIVER_CALLBACK:
> +				if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
> +					break;
> +				}
> +
> +				res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
> +
> +				marshall_from_mar_cpg_name_t (
> +					&group_name,
> +					&res_cpg_deliver_callback->group_name);
> +
> +				cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
> +					&group_name,
> +					res_cpg_deliver_callback->nodeid,
> +					res_cpg_deliver_callback->pid,
> +					&res_cpg_deliver_callback->message,
> +					res_cpg_deliver_callback->msglen);
>  				break;
> -			}
> -
> -			res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
>  
> -			marshall_from_mar_cpg_name_t (
> -				&group_name,
> -				&res_cpg_deliver_callback->group_name);
> +			case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
> +				if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
> +					break;
> +				}
> +
> +				res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
> +
> +				for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
> +					marshall_from_mar_cpg_address_t (&member_list[i],
> +						&res_cpg_confchg_callback->member_list[i]);
> +				}
> +				left_list_start = res_cpg_confchg_callback->member_list +
> +					res_cpg_confchg_callback->member_list_entries;
> +				for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
> +					marshall_from_mar_cpg_address_t (&left_list[i],
> +						&left_list_start[i]);
> +				}
> +				joined_list_start = res_cpg_confchg_callback->member_list +
> +					res_cpg_confchg_callback->member_list_entries +
> +					res_cpg_confchg_callback->left_list_entries;
> +				for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
> +					marshall_from_mar_cpg_address_t (&joined_list[i],
> +						&joined_list_start[i]);
> +				}
> +				marshall_from_mar_cpg_name_t (
> +					&group_name,
> +					&res_cpg_confchg_callback->group_name);
> +
> +				cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
> +					&group_name,
> +					member_list,
> +					res_cpg_confchg_callback->member_list_entries,
> +					left_list,
> +					res_cpg_confchg_callback->left_list_entries,
> +					joined_list,
> +					res_cpg_confchg_callback->joined_list_entries);
>  
> -			callbacks.cpg_deliver_fn (handle,
> -				&group_name,
> -				res_cpg_deliver_callback->nodeid,
> -				res_cpg_deliver_callback->pid,
> -				&res_cpg_deliver_callback->message,
> -				res_cpg_deliver_callback->msglen);
> -			break;
> -
> -		case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
> -			if (callbacks.cpg_confchg_fn == NULL) {
>  				break;
> -			}
> -
> -			res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
> -
> -			for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
> -				marshall_from_mar_cpg_address_t (&member_list[i],
> -					&res_cpg_confchg_callback->member_list[i]);
> -			}
> -			left_list_start = res_cpg_confchg_callback->member_list +
> -				res_cpg_confchg_callback->member_list_entries;
> -			for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
> -				marshall_from_mar_cpg_address_t (&left_list[i],
> -					&left_list_start[i]);
> -			}
> -			joined_list_start = res_cpg_confchg_callback->member_list +
> -				res_cpg_confchg_callback->member_list_entries +
> -				res_cpg_confchg_callback->left_list_entries;
> -			for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
> -				marshall_from_mar_cpg_address_t (&joined_list[i],
> -					&joined_list_start[i]);
> -			}
> -			marshall_from_mar_cpg_name_t (
> -				&group_name,
> -				&res_cpg_confchg_callback->group_name);
> -
> -			callbacks.cpg_confchg_fn (handle,
> -				&group_name,
> -				member_list,
> -				res_cpg_confchg_callback->member_list_entries,
> -				left_list,
> -				res_cpg_confchg_callback->left_list_entries,
> -				joined_list,
> -				res_cpg_confchg_callback->joined_list_entries);
> -			break;
> -
> -		default:
> -			coroipcc_dispatch_put (cpg_inst->handle);
> -			error = CS_ERR_LIBRARY;
> -			goto error_put;
> -			break;
> -		}
> +			case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
> +				if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
> +					break;
> +				}
> +
> +				res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
> +
> +				marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
> +				for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
> +					totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
> +				}
> +
> +				cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
> +					ring_id,
> +					res_cpg_totem_confchg_callback->member_list_entries,
> +					totem_member_list);
> +				break;
> +			default:
> +				coroipcc_dispatch_put (cpg_inst->handle);
> +				error = CS_ERR_LIBRARY;
> +				goto error_put;
> +				break;
> +			} /* - switch (dispatch_data->id) */
> +			break; /* case CPG_MODEL_V1 */
> +		} /* - switch (cpg_inst_copy.model_data.model) */
>  		coroipcc_dispatch_put (cpg_inst->handle);
>  
>  		/*
> @@ -434,6 +501,14 @@ cs_error_t cpg_join (
>  	req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
>  	req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
>  	req_lib_cpg_join.pid = getpid();
> +	req_lib_cpg_join.flags = 0;
> +
> +	switch (cpg_inst->model_data.model) {
> +	case CPG_MODEL_V1:
> +		req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
> +		break;
> +	}
> +
>  	marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
>  		group);
>  
> diff --git a/trunk/lib/libcpg.verso b/trunk/lib/libcpg.verso
> index 1454f6e..ee74734 100644
> --- a/trunk/lib/libcpg.verso
> +++ b/trunk/lib/libcpg.verso
> @@ -1 +1 @@
> -4.0.1
> +4.1.0
> diff --git a/trunk/man/Makefile.am b/trunk/man/Makefile.am
> index da01c2e..fe8f71b 100644
> --- a/trunk/man/Makefile.am
> +++ b/trunk/man/Makefile.am
> @@ -71,6 +71,7 @@ dist_man_MANS = \
>  	cpg_leave.3 \
>  	cpg_local_get.3 \
>  	cpg_mcast_joined.3 \
> +	cpg_model_initialize.3 \
>  	cpg_zcb_mcast_joined.3 \
>  	cpg_zcb_alloc.3 \
>  	cpg_zcb_free.3 \
> diff --git a/trunk/man/cpg_initialize.3 b/trunk/man/cpg_initialize.3
> index ce6e25a..6d4bc51 100644
> --- a/trunk/man/cpg_initialize.3
> +++ b/trunk/man/cpg_initialize.3
> @@ -41,7 +41,10 @@ cpg_initialize \- Create a new connection to the CPG service
>  .SH DESCRIPTION
>  The
>  .B cpg_initialize
> -function is used to initialize a connection to the closed process groups API.
> +function is used to initialize a connection to the closed process groups API. This function is deprecated
> +and
> +.B cpg_model_initialize
> +should be used in newly written code.
>  .PP
>  Each application may have several connections to the CPG API.  Each  application
>  uses the
> @@ -167,5 +170,6 @@ The errors are undocumented.
>  .BR cpg_context_get (3)
>  .BR cpg_context_set (3)
>  .BR cpg_local_get (3)
> +.BR cpg_model_initialize (3)
>  
>  .PP
> diff --git a/trunk/man/cpg_model_initialize.3 b/trunk/man/cpg_model_initialize.3
> new file mode 100644
> index 0000000..8ecf810
> --- /dev/null
> +++ b/trunk/man/cpg_model_initialize.3
> @@ -0,0 +1,227 @@
> +.\"/*
> +.\" * Copyright (c) 2010 Red Hat, Inc.
> +.\" *
> +.\" * All rights reserved.
> +.\" *
> +.\" * Author: Jan Friesse <jfriesse at redhat.com>
> +.\" * Author: Christine Caulfield <ccaulfie at redhat.com>
> +.\" *
> +.\" * This software licensed under BSD license, the text of which follows:
> +.\" *
> +.\" * Redistribution and use in source and binary forms, with or without
> +.\" * modification, are permitted provided that the following conditions are met:
> +.\" *
> +.\" * - Redistributions of source code must retain the above copyright notice,
> +.\" *   this list of conditions and the following disclaimer.
> +.\" * - Redistributions in binary form must reproduce the above copyright notice,
> +.\" *   this list of conditions and the following disclaimer in the documentation
> +.\" *   and/or other materials provided with the distribution.
> +.\" * - Neither the name of the MontaVista Software, Inc. nor the names of its
> +.\" *   contributors may be used to endorse or promote products derived from this
> +.\" *   software without specific prior written permission.
> +.\" *
> +.\" * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
> +.\" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
> +.\" * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
> +.\" * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
> +.\" * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
> +.\" * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> +.\" * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
> +.\" * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
> +.\" * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
> +.\" * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
> +.\" * THE POSSIBILITY OF SUCH DAMAGE.
> +.\" */
> +.TH CPG_MODEL_INITIALIZE 3 2010-04-07 "corosync Man Page" "Corosync Cluster Engine Programmer's Manual"
> +.SH NAME
> +cpg_model_initialize \- Create a new connection to the CPG service
> +.SH SYNOPSIS
> +.B #include <corosync/cpg.h>
> +.sp
> +.BI "cs_error_t cpg_model_initialize(cpg_handle_t *" handle ", cpg_model_t " model ", cpg_model_data_t *" model_data ", void *" context ");
> +
> +.SH DESCRIPTION
> +The
> +.B cpg_model_initialize
> +function is used to initialize a connection to the closed process groups API.
> +.PP
> +Each application may have several connections to the CPG API.  Each  application
> +uses the
> +.I handle
> +argument to uniquely identify the connection.  The
> +.I handle
> +argument is then used in other function calls to identify the connection to be used
> +for communication with the CPG service.
> +.PP
> +Argument
> +.I model
> +is used to explicitly choose set of callbacks and internal parameters. Currently only model
> +.I CPG_MODEL_V1
> +is defined.
> +.PP
> +Callbacks and internal parameters are passed by
> +.I model_data
> +argument. This is casted pointer (idea is similar as in sockaddr function) to one of structures
> +corresponding to chosen model. Currently only
> +.I cpg_model_v1_data_t
> +is needed.
> +.SH MODEL_V1
> +The
> +.I MODEL_V1
> +is backwards compatible with original callbacks initialized by
> +.I cpg_initialize
> +but new callback
> +.I cpg_totem_confchg_fn
> +is defined.
> +.PP
> +Every time an CPG event occurs within the joined group, one of the callbacks specified by the argument
> +.I callbacks
> +is called.  The callback functions are described by the following type definitions:
> +.PP
> +.PP
> +.IP
> +.RS
> +.ne 18
> +.nf
> +.ta 4n 20n 32n
> +
> +typedef void (*cpg_deliver_fn_t) (
> +        cpg_handle_t handle,
> +        const struct cpg_name *group_name,
> +        uint32_t nodeid,
> +        uint32_t pid,
> +        const void *msg,
> +        size_t msg_len);
> +
> +
> +typedef void (*cpg_confchg_fn_t) (
> +        cpg_handle_t handle,
> +        const struct cpg_name *group_name,
> +        const struct cpg_address *member_list, size_t member_list_entries,
> +        const struct cpg_address *left_list, size_t left_list_entries,
> +        const struct cpg_address *joined_list, size_t joined_list_entries);
> +
> +
> +typedef void (*cpg_totem_confchg_fn_t) (
> +        cpg_handle_t handle,
> +        struct cpg_ring_id ring_id,
> +        uint32_t member_list_entries,
> +        const uint32_t *member_list);
> +.ta
> +.fi
> +.RE
> +.IP
> +.PP
> +.PP
> +The
> +.I cpg_model_v1_data_t
> +structure is defined as:
> +.IP
> +.RS
> +.ne 18
> +.nf
> +.PP
> +typedef struct {
> +        cpg_model_t model;
> +        cpg_deliver_fn_t cpg_deliver_fn;
> +        cpg_confchg_fn_t cpg_confchg_fn;
> +        cpg_totem_confchg_fn_t cpg_totem_confchg_fn;
> +} cpg_model_v1_data_t;
> +.ta
> +.fi
> +.RE
> +.IP
> +.PP
> +When a configuration change occurs or a message is to be delivered one of the callbacks
> +is called from the
> +.B cpg_dispatch()
> +function.  If a configuration change occurs,
> +.I cpg_confchg_fn
> +is called.  If a delivery of a message occurs,
> +.I cpg_deliver_fn
> +is called.
> +When totem membership change occurs,
> +.I cpg_totem_confchg_fn
> +is called.
> +The
> +.I cpg_address
> +structure is defined
> +.IP
> +.RS
> +.ne 18
> +.nf
> +.PP
> +struct cpg_address {
> +        unsigned int nodeid;
> +        unsigned int pid;
> +        unsigned int reason;
> +};
> +.ta
> +.fi
> +.RE
> +.IP
> +.PP
> +where nodeid is a 32 bit unique node identifier, pid is the process ID of the process that has joined/left the group
> +or sent the message, and reason is an integer code indicating why the node joined/left the group.
> +.PP
> +.IP
> +.RS
> +.ne 18
> +.nf
> +.PP
> +CPG_REASON_JOIN     - the process joined a group using cpg_join().
> +CPG_REASON_LEAVE    - the process left a group using cpg_leave()
> +CPG_REASON_NODEDOWN - the process left a group because the node left the cluster.
> +CPG_REASON_NODEUP   - the process joined a group because it was already a member of a group on a node that has just joined the cluster
> +CPG_REASON_PROCDOWN - the process left a group without calling cpg_leave()
> +.ta
> +.fi
> +.RE
> +.IP
> +.PP
> +The
> +.I cpg_ring_id
> +structure is defined
> +.IP
> +.RS
> +.ne 18
> +.nf
> +.PP
> +struct cpg_ring_id {
> +        uint32_t nodeid;
> +        uint64_t seq;
> +};
> +.ta
> +.fi
> +.RE
> +.IP
> +.PP
> +where
> +.I nodeid
> +is if of node of current Totem leader and seq is increasing number.
> +
> +.PP
> +.SH RETURN VALUE
> +This call returns the CPG_OK value if successful, otherwise an error is returned.
> +.PP
> +.SH ERRORS
> +The errors are undocumented.
> +.SH "SEE ALSO"
> +.BR cpg_overview (8),
> +.BR cpg_initialize (3),
> +.BR cpg_finalize (3),
> +.BR cpg_fd_get (3),
> +.BR cpg_dispatch (3),
> +.BR cpg_join (3),
> +.BR cpg_leave (3),
> +.BR cpg_mcast_joined (3),
> +.BR cpg_membership_get (3)
> +.BR cpg_zcb_alloc (3)
> +.BR cpg_zcb_free (3)
> +.BR cpg_zcb_mcast_joined (3)
> +.BR cpg_context_get (3)
> +.BR cpg_context_set (3)
> +.BR cpg_local_get (3)
> +.BR cpg_model_initialize (3)
> +
> +.PP
> diff --git a/trunk/man/cpg_overview.8 b/trunk/man/cpg_overview.8
> index 84d4c3b..1f268a5 100644
> --- a/trunk/man/cpg_overview.8
> +++ b/trunk/man/cpg_overview.8
> @@ -61,6 +61,7 @@ access the corosync services.
>  .BR cpg_join (3),
>  .BR cpg_leave (3),
>  .BR cpg_mcast_joined (3),
> +.BR cpg_model_initialize (3),
>  .BR cpg_membership_get (3)
>  .BR cpg_zcb_alloc (3)
>  .BR cpg_zcb_free (3)
> diff --git a/trunk/services/cpg.c b/trunk/services/cpg.c
> index ede426f..829767a 100644
> --- a/trunk/services/cpg.c
> +++ b/trunk/services/cpg.c
> @@ -133,6 +133,8 @@ struct cpg_pd {
>   	mar_cpg_name_t group_name;
>  	uint32_t pid;
>  	enum cpd_state cpd_state;
> +	unsigned int flags;
> +	int initial_totem_conf_sent;
>  	struct list_head list;
>  	struct list_head iteration_instance_list_head;
>  };
> @@ -160,6 +162,8 @@ static struct corosync_api_v1 *api = NULL;
>  
>  static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
>  
> +static mar_cpg_ring_id_t last_sync_ring_id;
> +
>  struct process_info {
>  	unsigned int nodeid;
>  	uint32_t pid;
> @@ -255,6 +259,11 @@ static void cpg_sync_activate (void);
>  
>  static void cpg_sync_abort (void);
>  
> +static int notify_lib_totem_membership (
> +	void *conn,
> +	int member_list_entries,
> +	const unsigned int *member_list);
> +
>  /*
>   * Library Handler Definition
>   */
> @@ -432,6 +441,9 @@ static void cpg_sync_init_v2 (
>  		sizeof (unsigned int));
>  	my_member_list_entries = member_list_entries;
>  
> +	last_sync_ring_id.nodeid = ring_id->rep.nodeid;
> +	last_sync_ring_id.seq = ring_id->seq;
> +
>  	for (i = 0; i < my_member_list_entries; i++) {
>  		if (my_member_list[i] < lowest_nodeid) {
>  			lowest_nodeid = my_member_list[i];
> @@ -482,13 +494,50 @@ static void cpg_sync_activate (void)
>  	memcpy (my_old_member_list, my_member_list,
>  		my_member_list_entries * sizeof (unsigned int));
>  	my_old_member_list_entries = my_member_list_entries;
> +
> +	notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
>  }
>  
>  static void cpg_sync_abort (void)
>  {
>  }
>  
> +static int notify_lib_totem_membership (
> +	void *conn,
> +	int member_list_entries,
> +	const unsigned int *member_list)
> +{
> +	struct list_head *iter;
> +	char *buf;
> +	int size;
> +	struct res_lib_cpg_totem_confchg_callback *res;
> +
> +	size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
> +		sizeof(mar_uint32_t) * (member_list_entries);
> +	buf = alloca(size);
> +	if (!buf)
> +		return CPG_ERR_LIBRARY;
>  
> +	res = (struct res_lib_cpg_totem_confchg_callback *)buf;
> +	res->member_list_entries = member_list_entries;
> +	res->header.size = size;
> +	res->header.id = MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK;
> +	res->header.error = CS_OK;
> +
> +	memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
> +	memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
> +
> +	if (conn == NULL) {
> +		for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
> +			struct cpg_pd *cpg_pd = list_entry (iter, struct cpg_pd, list);
> +			api->ipc_dispatch_send (cpg_pd->conn, buf, size);
> +		}
> +	} else {
> +		api->ipc_dispatch_send (conn, buf, size);
> +	}
> +
> +	return CPG_OK;
> +}
>  
>  static int notify_lib_joinlist(
>  	const mar_cpg_name_t *group_name,
> @@ -604,6 +653,20 @@ static int notify_lib_joinlist(
>  		}
>  	}
>  
> +
> +	/*
> +	 * Traverse thru cpds and send totem membership for cpd, where it is not send yet
> +	 */
> +	for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; iter = iter->next) {
> +		struct cpg_pd *cpd = list_entry (iter, struct cpg_pd, list);
> +
> +		if ((cpd->flags & CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF) && (cpd->initial_totem_conf_sent == 0)) {
> +			cpd->initial_totem_conf_sent = 1;
> +
> +			notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
> +		}
> +	}
> +
>  	return CPG_OK;
>  }
>  
> @@ -1093,6 +1156,7 @@ static void message_handler_req_lib_cpg_join (void *conn, const void *message)
>  		error = CPG_OK;
>  		cpd->cpd_state = CPD_STATE_JOIN_STARTED;
>  		cpd->pid = req_lib_cpg_join->pid;
> +		cpd->flags = req_lib_cpg_join->flags;
>  		memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
>  			sizeof (cpd->group_name));
>  
> diff --git a/trunk/test/testcpg.c b/trunk/test/testcpg.c
> index 2abe83d..c08406b 100644
> --- a/trunk/test/testcpg.c
> +++ b/trunk/test/testcpg.c
> @@ -132,9 +132,29 @@ static void ConfchgCallback (
>  	}
>  }
>  
> -static cpg_callbacks_t callbacks = {
> +static void TotemConfchgCallback (
> +	cpg_handle_t handle,
> +        struct cpg_ring_id ring_id,
> +        uint32_t member_list_entries,
> +        const uint32_t *member_list)
> +{
> +	int i;
> +
> +	printf("\nTotemConfchgCallback: ringid (%u.%llu)\n", ring_id.nodeid, ring_id.seq);
> +
> +	printf("active processors %lu: ",
> +	       (unsigned long int) member_list_entries);
> +	for (i=0; i<member_list_entries; i++) {
> +		printf("%d ", member_list[i]);
> +	}
> +	printf ("\n");
> +}
> +
> +static cpg_model_v1_data_t callbacks = {
>  	.cpg_deliver_fn =            DeliverCallback,
>  	.cpg_confchg_fn =            ConfchgCallback,
> +	.cpg_totem_confchg_fn =      TotemConfchgCallback,
> +	.flags =                     CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF,
>  };
>  
>  static void sigintr_handler (int signum) __attribute__((__noreturn__));
> @@ -170,7 +190,7 @@ int main (int argc, char *argv[]) {
>  		group_name.length = 6;
>  	}
>  
> -	result = cpg_initialize (&handle, &callbacks);
> +	result = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&callbacks, NULL);
>  	if (result != CS_OK) {
>  		printf ("Could not initialize Cluster Process Group API instance error %d\n", result);
>  		exit (1);
> _______________________________________________
> Openais mailing list
> Openais at lists.linux-foundation.org
> https://lists.linux-foundation.org/mailman/listinfo/openais



More information about the Openais mailing list