[Openais] [PATCH-COROSYNC (resend)] Propagate the flow control state between AIS exec and library

Angus & Anna Salkeld ahsalkeld at gmail.com
Wed Sep 17 02:57:45 PDT 2008


This patch causes the flow control state in the library to be set
properly when the flow control is turned off (disabled).  Then it can be
read properly by the flow control apis.
This also fixes the case where the application is no longer sending
messages and it has already dispatched all its received messages
before flow control is disabled.

Also, CPG response messages with a TRY_AGAIN error did NOT contain
a valid flow control state value. This meant the library could get
stuck with flow control enabled (flow control was never enabled
for the EXEC, so no disable event occurred).
This case was hit when a new node was joining - sync_in_process()
resulted in a TRY_AGAIN for error cpg_mcast_joined).

Also, in message_handler_req_exec_cpg_mcast() the state passed
back to the library defaulted to disabled for messages received
from another node (even if flow control was still enabled)
- this meant if multiple nodes were sending CPG messages,
  then the library flow control state flip-flopped between
  enabled and disabled.

Author: Steven Dake <sdake at redhat.com> &
        Tim Beale <tim.beale at alliedtelesis.co.nz>
---
 exec/apidef.c                     |    1 +
 exec/ipc.c                        |   17 ++++++++++++++++-
 exec/ipc.h                        |    2 ++
 include/corosync/engine/coroapi.h |    2 ++
 include/corosync/ipc_cpg.h        |    8 +++++++-
 lib/cpg.c                         |   19 ++++++++++++++++---
 services/cpg.c                    |   23 ++++++++++++++++++++---
 7 files changed, 64 insertions(+), 8 deletions(-)

diff --git a/exec/apidef.c b/exec/apidef.c
index d24c307..514f162 100644
--- a/exec/apidef.c
+++ b/exec/apidef.c
@@ -69,6 +69,7 @@ static struct corosync_api_v1 apidef_corosync_api_v1 = {
 	.ipc_source_is_local = message_source_is_local,
 	.ipc_private_data_get = corosync_conn_private_data_get,
 	.ipc_response_send = NULL,
+	.ipc_response_no_fcc = corosync_conn_send_response_no_fcc,
 	.ipc_dispatch_send = NULL,
 	.ipc_conn_send_response = corosync_conn_send_response,
 	.ipc_conn_partner_get = corosync_conn_partner_get,
diff --git a/exec/ipc.c b/exec/ipc.c
index 977f60c..0471247 100644
--- a/exec/ipc.c
+++ b/exec/ipc.c
@@ -111,6 +111,8 @@ LOGSYS_DECLARE_SUBSYS ("IPC", LOG_INFO);

 static unsigned int g_gid_valid = 0;

+static unsigned int dont_call_flow_control = 0;
+
 static totempg_groups_handle ipc_handle;

 DECLARE_LIST_INIT (conn_info_list_head);
@@ -1125,6 +1127,17 @@ void *corosync_conn_partner_get (void *conn)
 	}
 }

+int corosync_conn_send_response_no_fcc (
+	void *conn,
+	void *msg,
+	int mlen)
+{
+	dont_call_flow_control = 1;
+	corosync_conn_send_response (
+		conn, msg, mlen);
+	dont_call_flow_control = 0;
+}
+
 int corosync_conn_send_response (
 	void *conn,
 	void *msg,
@@ -1149,7 +1162,9 @@ int corosync_conn_send_response (
 		return (-1);
 	}

-	ipc_flow_control (conn_info);
+	if (dont_call_flow_control == 0) {
+		ipc_flow_control (conn_info);
+	}

 	outq = &conn_info->outq;

diff --git a/exec/ipc.h b/exec/ipc.h
index a29a698..fc24241 100644
--- a/exec/ipc.h
+++ b/exec/ipc.h
@@ -52,6 +52,8 @@ extern void *corosync_conn_private_data_get (void *conn);

 extern int corosync_conn_send_response (void *conn, void *msg, int mlen);

+extern int corosync_conn_send_response_no_fcc (void *conn, void *msg,
int mlen);
+
 extern void corosync_ipc_init (
         void (*serialize_lock_fn) (void),
         void (*serialize_unlock_fn) (void),
diff --git a/include/corosync/engine/coroapi.h
b/include/corosync/engine/coroapi.h
index 35757ea..1b9dae2 100644
--- a/include/corosync/engine/coroapi.h
+++ b/include/corosync/engine/coroapi.h
@@ -324,6 +324,8 @@ struct corosync_api_v1 {

 	int (*ipc_response_send) (void *conn, void *msg, int mlen);

+	int (*ipc_response_no_fcc) (void *conn, void *msg, int mlen);
+
 	int (*ipc_dispatch_send) (void *conn, void *msg, int mlen);

 	/*
diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h
index 32b8544..c1e68be 100644
--- a/include/corosync/ipc_cpg.h
+++ b/include/corosync/ipc_cpg.h
@@ -62,7 +62,8 @@ enum res_cpg_types {
 	MESSAGE_RES_CPG_FLOW_CONTROL_STATE_SET = 8,
 	MESSAGE_RES_CPG_LOCAL_GET = 9,
 	MESSAGE_RES_CPG_GROUPS_GET = 10,
-	MESSAGE_RES_CPG_GROUPS_CALLBACK = 11
+	MESSAGE_RES_CPG_GROUPS_CALLBACK = 11,
+	MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK = 12
 };

 enum lib_cpg_confchg_reason {
@@ -135,6 +136,11 @@ struct res_lib_cpg_deliver_callback {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };

+struct res_lib_cpg_flowcontrol_callback {
+	mar_res_header_t header __attribute__((aligned(8)));
+	mar_uint32_t flow_control_state __attribute__((aligned(8)));
+};
+
 struct req_lib_cpg_membership {
 	mar_req_header_t header __attribute__((aligned(8)));
 	mar_cpg_name_t group_name __attribute__((aligned(8)));
diff --git a/lib/cpg.c b/lib/cpg.c
index 6509b7a..89390f7 100644
--- a/lib/cpg.c
+++ b/lib/cpg.c
@@ -248,6 +248,7 @@ cpg_error_t cpg_dispatch (
 	int cont = 1; /* always continue do loop except when set to 0 */
 	int dispatch_avail;
 	struct cpg_inst *cpg_inst;
+	struct res_lib_cpg_flowcontrol_callback *res_cpg_flowcontrol_callback;
 	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
 	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
 	struct res_lib_cpg_groups_get_callback *res_lib_cpg_groups_get_callback;
@@ -397,6 +398,7 @@ cpg_error_t cpg_dispatch (
 				joined_list,
 				res_cpg_confchg_callback->joined_list_entries);
 			break;
+
 		case MESSAGE_RES_CPG_GROUPS_CALLBACK:
 			res_lib_cpg_groups_get_callback = (struct
res_lib_cpg_groups_get_callback *)&dispatch_data;
 			marshall_from_mar_cpg_name_t (
@@ -413,6 +415,12 @@ cpg_error_t cpg_dispatch (
 						    &group_name,
 						    member_list,
 						    res_lib_cpg_groups_get_callback->num_members);
+
+			break;
+
+		case MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK:
+			res_cpg_flowcontrol_callback = (struct
res_lib_cpg_flowcontrol_callback *)&dispatch_data;
+			cpg_inst->flow_control_state =
res_cpg_flowcontrol_callback->flow_control_state;
 			break;

 		default:
@@ -598,9 +606,14 @@ cpg_error_t cpg_mcast_joined (
 		goto error_exit;
 	}

-	cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
-	if (res_lib_cpg_mcast.header.error == CPG_ERR_TRY_AGAIN) {
-		cpg_inst->flow_control_state = CPG_FLOW_CONTROL_ENABLED;
+/*	Only update the flow control state when the return value is OK.
+ *	Otherwise the flow control state is not guaranteed to be valid in the
+ *	return message.
+ *	Also, don't set to ENABLED if the return value is TRY_AGAIN as this can lead
+ *	to Flow Control State sync issues between AIS LIB and EXEC.
+ */
+	if (res_lib_cpg_mcast.header.error == CPG_OK) {
+		cpg_inst->flow_control_state = res_lib_cpg_mcast.flow_control_state;
 	}
 	error = res_lib_cpg_mcast.header.error;

diff --git a/services/cpg.c b/services/cpg.c
index d5d37de..01405e0 100644
--- a/services/cpg.c
+++ b/services/cpg.c
@@ -738,9 +738,28 @@ static void cpg_flow_control_state_set_fn (
 	void *context,
 	enum corosync_flow_control_state flow_control_state)
 {
+	struct res_lib_cpg_flowcontrol_callback res_lib_cpg_flowcontrol_callback;
 	struct process_info *process_info = (struct process_info *)context;

 	process_info->flow_control_state = flow_control_state;
+	/*
+	 * Send disabled flow control if a disabled occurs.  This prevents
+	 * the condition where a disabled occurs after all messages have been
+	 * delivered and then there is no valid way to retrieve the flow
+	 * control state
+	 */
+	if (flow_control_state == CPG_FLOW_CONTROL_DISABLED) {
+		res_lib_cpg_flowcontrol_callback.header.id =
MESSAGE_RES_CPG_FLOWCONTROL_CALLBACK;
+		res_lib_cpg_flowcontrol_callback.header.size = sizeof (struct
res_lib_cpg_flowcontrol_callback);
+		res_lib_cpg_flowcontrol_callback.flow_control_state = flow_control_state;
+
+		if (process_info->trackerconn) {
+			api->ipc_response_no_fcc (
+				process_info->trackerconn,
+				&res_lib_cpg_flowcontrol_callback,
+				sizeof (struct res_lib_cpg_flowcontrol_callback));
+		}
+	}
 }

 /* Can byteswap join & leave messages */
@@ -965,7 +984,6 @@ static void message_handler_req_exec_cpg_mcast (
 {
 	struct req_exec_cpg_mcast *req_exec_cpg_mcast = (struct
req_exec_cpg_mcast *)message;
 	struct res_lib_cpg_deliver_callback *res_lib_cpg_mcast;
-	struct process_info *process_info;
 	int msglen = req_exec_cpg_mcast->msglen;
 	char buf[sizeof(*res_lib_cpg_mcast) + msglen];
 	struct group_info *gi;
@@ -986,8 +1004,6 @@ static void message_handler_req_exec_cpg_mcast (
 	res_lib_cpg_mcast->flow_control_state = CPG_FLOW_CONTROL_DISABLED;
 	if (api->ipc_source_is_local (&req_exec_cpg_mcast->source)) {
 		api->ipc_refcnt_dec (req_exec_cpg_mcast->source.conn);
-		process_info = (struct process_info *)api->ipc_private_data_get
(req_exec_cpg_mcast->source.conn);
-		res_lib_cpg_mcast->flow_control_state = process_info->flow_control_state;
 	}
 	memcpy(&res_lib_cpg_mcast->group_name, &gi->group_name,
 		sizeof(mar_cpg_name_t));
@@ -998,6 +1014,7 @@ static void message_handler_req_exec_cpg_mcast (
 	for (iter = gi->members.next; iter != &gi->members; iter = iter->next) {
 		struct process_info *pi = list_entry(iter, struct process_info, list);
 		if (pi->trackerconn && (pi->flags & PI_FLAG_MEMBER)) {
+			res_lib_cpg_mcast->flow_control_state = pi->flow_control_state;
 			api->ipc_conn_send_response(
 				pi->trackerconn,
 				buf,
--


More information about the Openais mailing list