No subject


Fri Nov 20 15:01:09 PST 2009


reception to be delayed.=0A=0ADelay token processing for nodes with =
delivery congestion.=0A=0A If a node is processing it's delivered messages =
slower than=0A other nodes (or one app is slower to receive than another =
app)=0A then our delivery queues will get congested. The solution here=0A =
is to delay the reception of the token so that the applications=0A get =
time to process their new messages.=0A=0A Results: much improved but =
...=0A If you really throttle the dispatch then the "token holding"=0A =
times out and ipc queues keep filling up (but much slower).=0A=0ASigned-off=
-by: Angus Salkeld <angus.salkeld at gmail.com>=0A---=0A exec/coroipcs.c      =
       |   20 ++++-=0A exec/main.c                 |    6 ++=0A exec/totems=
rp.c             |  193 ++++++++++++++++++++++++++++++++++---------=0A =
include/corosync/coroipcs.h |    1 +=0A 4 files changed, 178 insertions(+),=
 42 deletions(-)=0A=0Adiff --git a/exec/coroipcs.c b/exec/coroipcs.c=0Ainde=
x 54448cc..4379afa 100644=0A--- a/exec/coroipcs.c=0A+++ b/exec/coroipcs.c=
=0A@@ -1189,6 +1189,20 @@ static void memcpy_dwrap (struct conn_info =
*conn_info, void *msg, unsigned int l=0A 	conn_info->control_buffer->=
write =3D (write_idx + len) % conn_info->dispatch_size;=0A }=0A =0A+static =
int are_all_out_queues_empty (void)=0A+{=0A+	struct list_head *list;=0A+=
	struct conn_info *conn_info;=0A+=0A+	for (list =3D conn_info_lis=
t_head.next; list !=3D &conn_info_list_head;=0A+		list =3D =
list->next) {=0A+=0A+		conn_info =3D list_entry (list, struct =
conn_info, list);=0A+		if (!list_empty (&conn_info->outq_head))=0A=
+			return CS_FALSE;=0A+	}=0A+	return CS_TRUE;=0A+=
}=0A /**=0A  * simulate the behaviour in coroipcc.c=0A  */=0A@@ -1202,14 =
+1216,17 @@ static int flow_control_event_send (struct conn_info *conn_info=
, char event)=0A 	}=0A =0A 	if (conn_info->flow_control_state =
!=3D new_fc) {=0A+		conn_info->flow_control_state =3D =
new_fc;=0A 		if (new_fc =3D=3D 1) {=0A 			=
log_printf (LOGSYS_LEVEL_INFO, "Enabling flow control for %d, event =
%d\n",=0A 				conn_info->client_pid, event);=0A+	=
		api->ipc_queue_state_change (1 /* =3D=3D congested */);=0A =
		} else {=0A 			log_printf (LOGSYS_LEVEL_IN=
FO, "Disabling flow control for %d, event %d\n",=0A 				=
conn_info->client_pid, event);=0A+			if (are_all_out_que=
ues_empty ())=0A+				api->ipc_queue_state_change=
 (0 /* =3D=3D flushed*/);=0A 		}=0A-		conn_info->flow_con=
trol_state =3D new_fc;=0A 		api->stats_update_value (conn_info-=
>stats_handle, "flow_control",=0A 			&conn_info->flow_co=
ntrol_state,=0A 			sizeof(conn_info->flow_control_stat=
e));=0A@@ -1399,6 +1416,7 @@ static void msg_send_or_queue (void *conn, =
const struct iovec *iov, unsigned int=0A 		pthread_mutex_lock =
(&conn_info->mutex);=0A 		if (list_empty (&conn_info->outq_he=
ad)) {=0A 			conn_info->notify_flow_control_enabled =3D =
1;=0A+			api->ipc_queue_state_change (1 /* =3D=3D congested =
*/);=0A 			api->poll_dispatch_modify (conn_info->fd,=
=0A 				POLLIN|POLLOUT|POLLNVAL);=0A 		=
}=0Adiff --git a/exec/main.c b/exec/main.c=0Aindex ee31bf6..e56ff18 =
100644=0A--- a/exec/main.c=0A+++ b/exec/main.c=0A@@ -1054,6 +1054,11 @@ =
static void corosync_stats_decrement_value (hdb_handle_t handle,=0A 		=
&key_incr_dummy);=0A }=0A =0A+static void corosync_queue_state_change (int =
queue_state)=0A+{=0A+	totempg_event_signal (TOTEM_EVENT_DELIVERY_CONGESTE=
D, queue_state);=0A+}=0A+=0A static struct coroipcs_init_state_v2 =
ipc_init_state_v2 =3D {=0A 	.socket_name				=
=3D COROSYNC_SOCKET_NAME,=0A 	.sched_policy				=
=3D SCHED_OTHER,=0A@@ -1080,6 +1085,7 @@ static struct coroipcs_init_state_=
v2 ipc_init_state_v2 =3D {=0A 	.stats_update_value			=
=3D corosync_stats_update_value,=0A 	.stats_increment_value		=
=3D corosync_stats_increment_value,=0A 	.stats_decrement_value		=
=3D corosync_stats_decrement_value,=0A+	.ipc_queue_state_change		=
=3D corosync_queue_state_change,=0A };=0A =0A static void corosync_setsched=
uler (void)=0Adiff --git a/exec/totemsrp.c b/exec/totemsrp.c=0Aindex =
23a1732..9abb1cd 100644=0A--- a/exec/totemsrp.c=0A+++ b/exec/totemsrp.c=0A@=
@ -418,6 +418,8 @@ struct totemsrp_instance {=0A =0A 	poll_timer_handle =
timer_heartbeat_timeout;=0A =0A+	poll_timer_handle timer_delayed_orf=
_token_processing_timeout;=0A+=0A 	/*=0A 	 * Function and data used =
to log messages=0A 	 */=0A@@ -467,6 +469,10 @@ struct totemsrp_instance=
 {=0A =0A 	int my_token_held;=0A =0A+	int forward_token;=0A+=0A+	=
int delivery_congested;=0A+=0A 	unsigned long long token_ring_id_seq;=0A =
=0A 	unsigned int last_released;=0A@@ -505,6 +511,7 @@ struct totemsrp_i=
nstance {=0A 	void * token_recv_event_handle;=0A 	void * token_sent_e=
vent_handle;=0A 	char commit_token_storage[9000];=0A+	char =
orf_token_storage[1500];=0A };=0A =0A struct message_handlers {=0A@@ =
-607,6 +614,8 @@ static void timer_function_heartbeat_timeout (void =
*data);=0A static void timer_function_token_retransmit_timeout (void =
*data);=0A static void timer_function_token_hold_retransmit_timeout (void =
*data);=0A static void timer_function_merge_detect_timeout (void *data);=0A=
+static void timer_function_delayed_orf_token_processing_timeout (void =
*data);=0A+static void complete_orf_token_processing (void *data);=0A =0A =
void main_deliver_fn (=0A 	void *context,=0A@@ -665,6 +674,10 @@ =
static void totemsrp_instance_initialize (struct totemsrp_instance =
*instance)=0A 	instance->my_high_delivered =3D SEQNO_START_MSG;=0A =0A 	=
instance->commit_token =3D (struct memb_commit_token *)instance->commit_tok=
en_storage;=0A+=0A+	instance->delivery_congested =3D 0;=0A+=0A+	=
instance->timer_delayed_orf_token_processing_timeout =3D 0;=0A }=0A =0A =
static void main_token_seqid_get (=0A@@ -1476,6 +1489,15 @@ static void =
start_token_hold_retransmit_timeout (struct totemsrp_instance *insta=0A 	=
	&instance->timer_orf_token_hold_retransmit_timeout);=0A }=0A =
=0A+static void start_orf_token_delay_timeout (struct totemsrp_instance =
*instance, int timeout)=0A+{=0A+	poll_timer_add (instance->totemsrp_=
poll_handle,=0A+		timeout,=0A+		(void *)instance,=
=0A+		timer_function_delayed_orf_token_processing_timeout,=0A+	=
	&instance->timer_delayed_orf_token_processing_timeout);=0A+}=0A+=0A=
 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance=
 *instance)=0A {=0A 	poll_timer_delete (instance->totemsrp_poll_handle,=
=0A@@ -2090,13 +2112,55 @@ originated:=0A 	return;=0A }=0A =0A+static =
void timer_function_delayed_orf_token_processing_timeout (void *data)=0A+{=
=0A+	struct totemsrp_instance *instance =3D data;=0A+=0A+	if =
(instance->delivery_congested =3D=3D 1) {=0A+		log_printf =
(instance->totemsrp_log_level_debug,=0A+			"orf token =
delay timer expired: delivery congestion not cleared\n");=0A+	} else =
{=0A+		log_printf (instance->totemsrp_log_level_debug,=0A+		=
	"orf token delay timer expired: delivery congestion cleared\n");=0A=
+	}=0A+	instance->timer_delayed_orf_token_processing_timeout =3D =
0;=0A+	complete_orf_token_processing (instance);=0A+}=0A+=0A void =
totemsrp_event_signal (void *srp_context, enum totem_event_type type, int =
value)=0A {=0A 	struct totemsrp_instance *instance =3D (struct totemsrp_ins=
tance *)srp_context;=0A =0A-	token_hold_cancel_send (instance);=0A+	=
switch (type) {=0A+	case TOTEM_EVENT_NEW_MSG:=0A+		token_hold_=
cancel_send (instance);=0A+		break;=0A =0A-	return (0);=0A+	=
case TOTEM_EVENT_DELIVERY_CONGESTED:=0A+		if (value =3D=3D =
0)=0A+			instance->delivery_congested =3D 0;=0A+		=
else=0A+			instance->delivery_congested =3D 1;=0A+=0A+=
		log_printf (instance->totemsrp_log_level_debug,=0A+		=
	"delivery congestion event %d\n", instance->delivery_congested);=0A=
+=0A+		if (instance->timer_delayed_orf_token_processing_timeout =
!=3D 0 &&=0A+			instance->delivery_congested =3D=3D 0) =
{=0A+			poll_timer_delete (instance->totemsrp_poll_handle,=
=0A+				instance->timer_delayed_orf_token_processin=
g_timeout);=0A+			instance->timer_delayed_orf_token_processin=
g_timeout =3D 0;=0A+			log_printf (instance->totemsrp_log_=
level_debug,=0A+                            "Cleared delivery congestion\n"=
);=0A+=0A+			complete_orf_token_processing (instance);=
=0A+		}=0A+		break;=0A+	default:=0A+		=
log_printf (instance->totemsrp_log_level_debug,=0A+			=
"totem: unknown event %d signaled\n", type);=0A+		break;=0A+	=
}=0A }=0A =0A int totemsrp_mcast (=0A@@ -2587,8 +2651,7 @@ static void =
timer_function_merge_detect_timeout(void *data)=0A  */=0A static int =
token_send (=0A 	struct totemsrp_instance *instance,=0A-	struct =
orf_token *orf_token,=0A-	int forward_token)=0A+	struct orf_token =
*orf_token)=0A {=0A 	int res =3D 0;=0A 	unsigned int orf_token_size=
;=0A@@ -2601,7 +2664,7 @@ static int token_send (=0A 	orf_token->header.n=
odeid =3D instance->my_id.addr[0].nodeid;=0A 	assert (orf_token->header.n=
odeid);=0A =0A-	if (forward_token =3D=3D 0) {=0A+	if (instance->forwa=
rd_token =3D=3D 0) {=0A 		return (0);=0A 	}=0A =0A@@ -2676,7 =
+2739,8 @@ static int orf_token_send_initial (struct totemsrp_instance =
*instance)=0A =0A 	orf_token.rtr_list_entries =3D 0;=0A =0A-	=
res =3D token_send (instance, &orf_token, 1);=0A+	instance->forward_t=
oken =3D 1;=0A+	res =3D token_send (instance, &orf_token);=0A =0A 	=
return (res);=0A }=0A@@ -3309,10 +3373,8 @@ static int message_handler_orf_=
token (=0A 	size_t msg_len,=0A 	int endian_conversion_needed)=0A =
{=0A-	char token_storage[1500];=0A 	char token_convert[1500];=0A 	=
struct orf_token *token =3D NULL;=0A-	int forward_token;=0A 	unsigned =
int transmits_allowed;=0A 	unsigned int mcasted_retransmit;=0A 	=
unsigned int mcasted_regular;=0A@@ -3336,6 +3398,18 @@ static int =
message_handler_orf_token (=0A 	}=0A #endif=0A =0A+	if (instance->timer=
_delayed_orf_token_processing_timeout !=3D 0) {=0A+		log_printf =
(instance->totemsrp_log_level_debug,=0A+			"Received =
orf token whilst delay timer pending.\n");=0A+=0A+			=
poll_timer_delete (instance->totemsrp_poll_handle,=0A+				=
instance->timer_delayed_orf_token_processing_timeout);=0A+			=
instance->timer_delayed_orf_token_processing_timeout =3D 0;=0A+=0A+		=
	complete_orf_token_processing (instance);=0A+	}=0A+=0A+=0A 	if =
(endian_conversion_needed) {=0A 		orf_token_endian_convert =
((struct orf_token *)msg,=0A 			(struct orf_token =
*)token_convert);=0A@@ -3346,7 +3420,7 @@ static int message_handler_orf_to=
ken (=0A 	 * Make copy of token and retransmit list in case we =
have=0A 	 * to flush incoming messages from the kernel queue=0A 	 =
*/=0A-	token =3D (struct orf_token *)token_storage;=0A+	token =3D =
(struct orf_token *)instance->orf_token_storage;=0A 	memcpy (token, =
msg, sizeof (struct orf_token));=0A 	memcpy (&token->rtr_list[0], (char =
*)msg + sizeof (struct orf_token),=0A 		sizeof (struct rtr_item) * =
RETRANSMIT_ENTRIES_MAX);=0A@@ -3391,10 +3465,10 @@ static int message_handl=
er_orf_token (=0A 	 * Hold onto token when there is no activity on =
ring and=0A 	 * this processor is the ring rep=0A 	 */=0A-	forward_tok=
en =3D 1;=0A+	instance->forward_token =3D 1;=0A 	if (totemip_equal(&=
instance->my_ring_id.rep, &instance->my_id.addr[0])) {=0A 		if =
(instance->my_token_held) {=0A-			forward_token =3D 0;=0A+	=
		instance->forward_token =3D 0;=0A 		}=0A 	=
}=0A =0A@@ -3422,7 +3496,7 @@ static int message_handler_orf_token (=0A 	=
	if (memcmp (&token->ring_id, &instance->my_ring_id,=0A 			=
sizeof (struct memb_ring_id)) !=3D 0) {=0A =0A-			if =
((forward_token)=0A+			if ((instance->forward_token)=0A 	=
			&& instance->use_heartbeat) {=0A 			=
	reset_heartbeat_timeout(instance);=0A 			}=0A@@ =
-3446,7 +3520,7 @@ static int message_handler_orf_token (=0A 			=
 */=0A 			reset_token_timeout (instance);=0A =0A-			=
if ((forward_token)=0A+			if ((instance->forward_token)=0A 	=
			&& instance->use_heartbeat) {=0A 			=
	reset_heartbeat_timeout(instance);=0A 			}=0A@@ =
-3565,41 +3639,25 @@ printf ("token seq %d\n", token->seq);=0A 			=
		instance->my_retrans_flg_count =3D 0;=0A 			=
	}=0A 			}=0A-=0A-			totemrrp_se=
nd_flush (instance->totemrrp_context);=0A-			token_send =
(instance, token, forward_token);=0A-=0A-#ifdef GIVEINFO=0A-			=
tv_current =3D timerlist_nano_current_get ();=0A-			=
tv_diff =3D tv_current - tv_old;=0A-			tv_old =3D =
tv_current;=0A-			log_printf (instance->totemsrp_log_level_de=
bug,=0A-				"I held %0.4f ms\n",=0A-		=
		((float)tv_diff) / 1000000.0);=0A-#endif=0A-			=
if (instance->memb_state =3D=3D MEMB_STATE_OPERATIONAL) {=0A-			=
	messages_deliver_to_app (instance, 0,=0A-				=
	instance->my_high_seq_received);=0A-			}=0A-=0A 	=
		/*=0A-			 * Deliver messages after token =
has been transmitted=0A-			 * to improve performance=
=0A+			 * delay token processing for nodes with delivery =
congestion=0A+			 * If a node is processing it's delivered =
messages slower than=0A+			 * other nodes (or one app =
is slower to receive than another app)=0A+			 * then =
our delivery queues will get congested. The solution here=0A+			=
 * is to delay the reception of the token so that the applications=0A+		=
	 * get time to process their new messages.=0A 			 =
*/=0A-			reset_token_timeout (instance); // REVIEWED=0A-		=
	reset_token_retransmit_timeout (instance); // REVIEWED=0A-		=
	if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.r=
ep) &&=0A-				instance->my_token_held =3D=3D 1) =
{=0A-=0A-				start_token_hold_retransmit_timeout=
 (instance);=0A+			if (instance->delivery_congested =
=3D=3D 1) {=0A+				start_orf_token_delay_timeout =
(instance, instance->totem_config->token_timeout - 250);=0A+			=
} else {=0A+				complete_orf_token_processing =
(instance);=0A 			}=0A-=0A-			token_callb=
acks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);=0A+			=
return (0);=0A 		}=0A 		break;=0A 	}=0A =0A-	if =
((forward_token)=0A+	if ((instance->forward_token)=0A 		&& =
instance->use_heartbeat) {=0A 		reset_heartbeat_timeout(instance);=
=0A 	}=0A@@ -3608,6 +3666,59 @@ printf ("token seq %d\n", token->seq);=
=0A 	}=0A =0A 	return (0);=0A+=0A+}=0A+=0A+static void complete_or=
f_token_processing (void *data)=0A+{=0A+	struct totemsrp_instance =
*instance =3D data;=0A+	struct orf_token *token =3D NULL;=0A+#ifdef =
GIVEINFO=0A+	unsigned long long tv_current;=0A+	unsigned long long =
tv_diff;=0A+#endif=0A+=0A+	token =3D (struct orf_token *)instance->orf=
_token_storage;=0A+=0A+	totemrrp_send_flush (instance->totemrrp_context);=
=0A+	token_send (instance, token);=0A+=0A+#ifdef GIVEINFO=0A+	=
tv_current =3D timerlist_nano_current_get ();=0A+	tv_diff =3D =
tv_current - tv_old;=0A+	tv_old =3D tv_current;=0A+	log_printf =
(instance->totemsrp_log_level_debug,=0A+		"I held %0.4f =
ms\n",=0A+		((float)tv_diff) / 1000000.0);=0A+#endif=0A+	if =
(instance->memb_state =3D=3D MEMB_STATE_OPERATIONAL) {=0A+		=
messages_deliver_to_app (instance, 0,=0A+			instance->m=
y_high_seq_received);=0A+	}=0A+=0A+	/*=0A+	 * Deliver =
messages after token has been transmitted=0A+	 * to improve performance=
=0A+	 */=0A+	reset_token_timeout (instance); // REVIEWED=0A+	reset_token=
_retransmit_timeout (instance); // REVIEWED=0A+	if (totemip_equal(&instance=
->my_id.addr[0], &instance->my_ring_id.rep) &&=0A+		instance->m=
y_token_held =3D=3D 1) {=0A+=0A+		start_token_hold_retransmit=
_timeout (instance);=0A+	}=0A+=0A+	token_callbacks_execute =
(instance, TOTEM_CALLBACK_TOKEN_SENT);=0A+=0A+=0A+	if ((instance->forw=
ard_token)=0A+		&& instance->use_heartbeat) {=0A+		=
reset_heartbeat_timeout(instance);=0A+	}=0A+	else {=0A+		=
cancel_heartbeat_timeout(instance);=0A+	}=0A+	return;=0A }=0A =0A static =
void messages_deliver_to_app (=0Adiff --git a/include/corosync/coroipcs.h =
b/include/corosync/coroipcs.h=0Aindex 3838af8..9904609 100644=0A--- =
a/include/corosync/coroipcs.h=0A+++ b/include/corosync/coroipcs.h=0A@@ =
-116,6 +116,7 @@ struct coroipcs_init_state_v2 {=0A 		...) =
__attribute__((format(printf, 5, 6)));=0A 	int log_subsys_id;=0A 	=
void (*stats_decrement_value) (hdb_handle_t handle, const char* name);=0A+	=
void (*ipc_queue_state_change) (int queue_state);=0A };=0A =0A extern void =
coroipcs_ipc_init (=0A-- =0A1.6.3.4=0A=0A
--=__Part9FB5CDB0.0__=--


More information about the Openais mailing list