No subject
Fri Nov 20 15:01:09 PST 2009
reception to be delayed.=0A=0ADelay token processing for nodes with =
delivery congestion.=0A=0A If a node is processing it's delivered messages =
slower than=0A other nodes (or one app is slower to receive than another =
app)=0A then our delivery queues will get congested. The solution here=0A =
is to delay the reception of the token so that the applications=0A get =
time to process their new messages.=0A=0A Results: much improved but =
...=0A If you really throttle the dispatch then the "token holding"=0A =
times out and ipc queues keep filling up (but much slower).=0A=0ASigned-off=
-by: Angus Salkeld <angus.salkeld at gmail.com>=0A---=0A exec/coroipcs.c =
| 20 ++++-=0A exec/main.c | 6 ++=0A exec/totems=
rp.c | 193 ++++++++++++++++++++++++++++++++++---------=0A =
include/corosync/coroipcs.h | 1 +=0A 4 files changed, 178 insertions(+),=
42 deletions(-)=0A=0Adiff --git a/exec/coroipcs.c b/exec/coroipcs.c=0Ainde=
x 54448cc..4379afa 100644=0A--- a/exec/coroipcs.c=0A+++ b/exec/coroipcs.c=
=0A@@ -1189,6 +1189,20 @@ static void memcpy_dwrap (struct conn_info =
*conn_info, void *msg, unsigned int l=0A conn_info->control_buffer->=
write =3D (write_idx + len) % conn_info->dispatch_size;=0A }=0A =0A+static =
int are_all_out_queues_empty (void)=0A+{=0A+ struct list_head *list;=0A+=
struct conn_info *conn_info;=0A+=0A+ for (list =3D conn_info_lis=
t_head.next; list !=3D &conn_info_list_head;=0A+ list =3D =
list->next) {=0A+=0A+ conn_info =3D list_entry (list, struct =
conn_info, list);=0A+ if (!list_empty (&conn_info->outq_head))=0A=
+ return CS_FALSE;=0A+ }=0A+ return CS_TRUE;=0A+=
}=0A /**=0A * simulate the behaviour in coroipcc.c=0A */=0A@@ -1202,14 =
+1216,17 @@ static int flow_control_event_send (struct conn_info *conn_info=
, char event)=0A }=0A =0A if (conn_info->flow_control_state =
!=3D new_fc) {=0A+ conn_info->flow_control_state =3D =
new_fc;=0A if (new_fc =3D=3D 1) {=0A =
log_printf (LOGSYS_LEVEL_INFO, "Enabling flow control for %d, event =
%d\n",=0A conn_info->client_pid, event);=0A+ =
api->ipc_queue_state_change (1 /* =3D=3D congested */);=0A =
} else {=0A log_printf (LOGSYS_LEVEL_IN=
FO, "Disabling flow control for %d, event %d\n",=0A =
conn_info->client_pid, event);=0A+ if (are_all_out_que=
ues_empty ())=0A+ api->ipc_queue_state_change=
(0 /* =3D=3D flushed*/);=0A }=0A- conn_info->flow_con=
trol_state =3D new_fc;=0A api->stats_update_value (conn_info-=
>stats_handle, "flow_control",=0A &conn_info->flow_co=
ntrol_state,=0A sizeof(conn_info->flow_control_stat=
e));=0A@@ -1399,6 +1416,7 @@ static void msg_send_or_queue (void *conn, =
const struct iovec *iov, unsigned int=0A pthread_mutex_lock =
(&conn_info->mutex);=0A if (list_empty (&conn_info->outq_he=
ad)) {=0A conn_info->notify_flow_control_enabled =3D =
1;=0A+ api->ipc_queue_state_change (1 /* =3D=3D congested =
*/);=0A api->poll_dispatch_modify (conn_info->fd,=
=0A POLLIN|POLLOUT|POLLNVAL);=0A =
}=0Adiff --git a/exec/main.c b/exec/main.c=0Aindex ee31bf6..e56ff18 =
100644=0A--- a/exec/main.c=0A+++ b/exec/main.c=0A@@ -1054,6 +1054,11 @@ =
static void corosync_stats_decrement_value (hdb_handle_t handle,=0A =
&key_incr_dummy);=0A }=0A =0A+static void corosync_queue_state_change (int =
queue_state)=0A+{=0A+ totempg_event_signal (TOTEM_EVENT_DELIVERY_CONGESTE=
D, queue_state);=0A+}=0A+=0A static struct coroipcs_init_state_v2 =
ipc_init_state_v2 =3D {=0A .socket_name =
=3D COROSYNC_SOCKET_NAME,=0A .sched_policy =
=3D SCHED_OTHER,=0A@@ -1080,6 +1085,7 @@ static struct coroipcs_init_state_=
v2 ipc_init_state_v2 =3D {=0A .stats_update_value =
=3D corosync_stats_update_value,=0A .stats_increment_value =
=3D corosync_stats_increment_value,=0A .stats_decrement_value =
=3D corosync_stats_decrement_value,=0A+ .ipc_queue_state_change =
=3D corosync_queue_state_change,=0A };=0A =0A static void corosync_setsched=
uler (void)=0Adiff --git a/exec/totemsrp.c b/exec/totemsrp.c=0Aindex =
23a1732..9abb1cd 100644=0A--- a/exec/totemsrp.c=0A+++ b/exec/totemsrp.c=0A@=
@ -418,6 +418,8 @@ struct totemsrp_instance {=0A =0A poll_timer_handle =
timer_heartbeat_timeout;=0A =0A+ poll_timer_handle timer_delayed_orf=
_token_processing_timeout;=0A+=0A /*=0A * Function and data used =
to log messages=0A */=0A@@ -467,6 +469,10 @@ struct totemsrp_instance=
{=0A =0A int my_token_held;=0A =0A+ int forward_token;=0A+=0A+ =
int delivery_congested;=0A+=0A unsigned long long token_ring_id_seq;=0A =
=0A unsigned int last_released;=0A@@ -505,6 +511,7 @@ struct totemsrp_i=
nstance {=0A void * token_recv_event_handle;=0A void * token_sent_e=
vent_handle;=0A char commit_token_storage[9000];=0A+ char =
orf_token_storage[1500];=0A };=0A =0A struct message_handlers {=0A@@ =
-607,6 +614,8 @@ static void timer_function_heartbeat_timeout (void =
*data);=0A static void timer_function_token_retransmit_timeout (void =
*data);=0A static void timer_function_token_hold_retransmit_timeout (void =
*data);=0A static void timer_function_merge_detect_timeout (void *data);=0A=
+static void timer_function_delayed_orf_token_processing_timeout (void =
*data);=0A+static void complete_orf_token_processing (void *data);=0A =0A =
void main_deliver_fn (=0A void *context,=0A@@ -665,6 +674,10 @@ =
static void totemsrp_instance_initialize (struct totemsrp_instance =
*instance)=0A instance->my_high_delivered =3D SEQNO_START_MSG;=0A =0A =
instance->commit_token =3D (struct memb_commit_token *)instance->commit_tok=
en_storage;=0A+=0A+ instance->delivery_congested =3D 0;=0A+=0A+ =
instance->timer_delayed_orf_token_processing_timeout =3D 0;=0A }=0A =0A =
static void main_token_seqid_get (=0A@@ -1476,6 +1489,15 @@ static void =
start_token_hold_retransmit_timeout (struct totemsrp_instance *insta=0A =
&instance->timer_orf_token_hold_retransmit_timeout);=0A }=0A =
=0A+static void start_orf_token_delay_timeout (struct totemsrp_instance =
*instance, int timeout)=0A+{=0A+ poll_timer_add (instance->totemsrp_=
poll_handle,=0A+ timeout,=0A+ (void *)instance,=
=0A+ timer_function_delayed_orf_token_processing_timeout,=0A+ =
&instance->timer_delayed_orf_token_processing_timeout);=0A+}=0A+=0A=
static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance=
*instance)=0A {=0A poll_timer_delete (instance->totemsrp_poll_handle,=
=0A@@ -2090,13 +2112,55 @@ originated:=0A return;=0A }=0A =0A+static =
void timer_function_delayed_orf_token_processing_timeout (void *data)=0A+{=
=0A+ struct totemsrp_instance *instance =3D data;=0A+=0A+ if =
(instance->delivery_congested =3D=3D 1) {=0A+ log_printf =
(instance->totemsrp_log_level_debug,=0A+ "orf token =
delay timer expired: delivery congestion not cleared\n");=0A+ } else =
{=0A+ log_printf (instance->totemsrp_log_level_debug,=0A+ =
"orf token delay timer expired: delivery congestion cleared\n");=0A=
+ }=0A+ instance->timer_delayed_orf_token_processing_timeout =3D =
0;=0A+ complete_orf_token_processing (instance);=0A+}=0A+=0A void =
totemsrp_event_signal (void *srp_context, enum totem_event_type type, int =
value)=0A {=0A struct totemsrp_instance *instance =3D (struct totemsrp_ins=
tance *)srp_context;=0A =0A- token_hold_cancel_send (instance);=0A+ =
switch (type) {=0A+ case TOTEM_EVENT_NEW_MSG:=0A+ token_hold_=
cancel_send (instance);=0A+ break;=0A =0A- return (0);=0A+ =
case TOTEM_EVENT_DELIVERY_CONGESTED:=0A+ if (value =3D=3D =
0)=0A+ instance->delivery_congested =3D 0;=0A+ =
else=0A+ instance->delivery_congested =3D 1;=0A+=0A+=
log_printf (instance->totemsrp_log_level_debug,=0A+ =
"delivery congestion event %d\n", instance->delivery_congested);=0A=
+=0A+ if (instance->timer_delayed_orf_token_processing_timeout =
!=3D 0 &&=0A+ instance->delivery_congested =3D=3D 0) =
{=0A+ poll_timer_delete (instance->totemsrp_poll_handle,=
=0A+ instance->timer_delayed_orf_token_processin=
g_timeout);=0A+ instance->timer_delayed_orf_token_processin=
g_timeout =3D 0;=0A+ log_printf (instance->totemsrp_log_=
level_debug,=0A+ "Cleared delivery congestion\n"=
);=0A+=0A+ complete_orf_token_processing (instance);=
=0A+ }=0A+ break;=0A+ default:=0A+ =
log_printf (instance->totemsrp_log_level_debug,=0A+ =
"totem: unknown event %d signaled\n", type);=0A+ break;=0A+ =
}=0A }=0A =0A int totemsrp_mcast (=0A@@ -2587,8 +2651,7 @@ static void =
timer_function_merge_detect_timeout(void *data)=0A */=0A static int =
token_send (=0A struct totemsrp_instance *instance,=0A- struct =
orf_token *orf_token,=0A- int forward_token)=0A+ struct orf_token =
*orf_token)=0A {=0A int res =3D 0;=0A unsigned int orf_token_size=
;=0A@@ -2601,7 +2664,7 @@ static int token_send (=0A orf_token->header.n=
odeid =3D instance->my_id.addr[0].nodeid;=0A assert (orf_token->header.n=
odeid);=0A =0A- if (forward_token =3D=3D 0) {=0A+ if (instance->forwa=
rd_token =3D=3D 0) {=0A return (0);=0A }=0A =0A@@ -2676,7 =
+2739,8 @@ static int orf_token_send_initial (struct totemsrp_instance =
*instance)=0A =0A orf_token.rtr_list_entries =3D 0;=0A =0A- =
res =3D token_send (instance, &orf_token, 1);=0A+ instance->forward_t=
oken =3D 1;=0A+ res =3D token_send (instance, &orf_token);=0A =0A =
return (res);=0A }=0A@@ -3309,10 +3373,8 @@ static int message_handler_orf_=
token (=0A size_t msg_len,=0A int endian_conversion_needed)=0A =
{=0A- char token_storage[1500];=0A char token_convert[1500];=0A =
struct orf_token *token =3D NULL;=0A- int forward_token;=0A unsigned =
int transmits_allowed;=0A unsigned int mcasted_retransmit;=0A =
unsigned int mcasted_regular;=0A@@ -3336,6 +3398,18 @@ static int =
message_handler_orf_token (=0A }=0A #endif=0A =0A+ if (instance->timer=
_delayed_orf_token_processing_timeout !=3D 0) {=0A+ log_printf =
(instance->totemsrp_log_level_debug,=0A+ "Received =
orf token whilst delay timer pending.\n");=0A+=0A+ =
poll_timer_delete (instance->totemsrp_poll_handle,=0A+ =
instance->timer_delayed_orf_token_processing_timeout);=0A+ =
instance->timer_delayed_orf_token_processing_timeout =3D 0;=0A+=0A+ =
complete_orf_token_processing (instance);=0A+ }=0A+=0A+=0A if =
(endian_conversion_needed) {=0A orf_token_endian_convert =
((struct orf_token *)msg,=0A (struct orf_token =
*)token_convert);=0A@@ -3346,7 +3420,7 @@ static int message_handler_orf_to=
ken (=0A * Make copy of token and retransmit list in case we =
have=0A * to flush incoming messages from the kernel queue=0A =
*/=0A- token =3D (struct orf_token *)token_storage;=0A+ token =3D =
(struct orf_token *)instance->orf_token_storage;=0A memcpy (token, =
msg, sizeof (struct orf_token));=0A memcpy (&token->rtr_list[0], (char =
*)msg + sizeof (struct orf_token),=0A sizeof (struct rtr_item) * =
RETRANSMIT_ENTRIES_MAX);=0A@@ -3391,10 +3465,10 @@ static int message_handl=
er_orf_token (=0A * Hold onto token when there is no activity on =
ring and=0A * this processor is the ring rep=0A */=0A- forward_tok=
en =3D 1;=0A+ instance->forward_token =3D 1;=0A if (totemip_equal(&=
instance->my_ring_id.rep, &instance->my_id.addr[0])) {=0A if =
(instance->my_token_held) {=0A- forward_token =3D 0;=0A+ =
instance->forward_token =3D 0;=0A }=0A =
}=0A =0A@@ -3422,7 +3496,7 @@ static int message_handler_orf_token (=0A =
if (memcmp (&token->ring_id, &instance->my_ring_id,=0A =
sizeof (struct memb_ring_id)) !=3D 0) {=0A =0A- if =
((forward_token)=0A+ if ((instance->forward_token)=0A =
&& instance->use_heartbeat) {=0A =
reset_heartbeat_timeout(instance);=0A }=0A@@ =
-3446,7 +3520,7 @@ static int message_handler_orf_token (=0A =
*/=0A reset_token_timeout (instance);=0A =0A- =
if ((forward_token)=0A+ if ((instance->forward_token)=0A =
&& instance->use_heartbeat) {=0A =
reset_heartbeat_timeout(instance);=0A }=0A@@ =
-3565,41 +3639,25 @@ printf ("token seq %d\n", token->seq);=0A =
instance->my_retrans_flg_count =3D 0;=0A =
}=0A }=0A-=0A- totemrrp_se=
nd_flush (instance->totemrrp_context);=0A- token_send =
(instance, token, forward_token);=0A-=0A-#ifdef GIVEINFO=0A- =
tv_current =3D timerlist_nano_current_get ();=0A- =
tv_diff =3D tv_current - tv_old;=0A- tv_old =3D =
tv_current;=0A- log_printf (instance->totemsrp_log_level_de=
bug,=0A- "I held %0.4f ms\n",=0A- =
((float)tv_diff) / 1000000.0);=0A-#endif=0A- =
if (instance->memb_state =3D=3D MEMB_STATE_OPERATIONAL) {=0A- =
messages_deliver_to_app (instance, 0,=0A- =
instance->my_high_seq_received);=0A- }=0A-=0A =
/*=0A- * Deliver messages after token =
has been transmitted=0A- * to improve performance=
=0A+ * delay token processing for nodes with delivery =
congestion=0A+ * If a node is processing it's delivered =
messages slower than=0A+ * other nodes (or one app =
is slower to receive than another app)=0A+ * then =
our delivery queues will get congested. The solution here=0A+ =
* is to delay the reception of the token so that the applications=0A+ =
* get time to process their new messages.=0A =
*/=0A- reset_token_timeout (instance); // REVIEWED=0A- =
reset_token_retransmit_timeout (instance); // REVIEWED=0A- =
if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.r=
ep) &&=0A- instance->my_token_held =3D=3D 1) =
{=0A-=0A- start_token_hold_retransmit_timeout=
(instance);=0A+ if (instance->delivery_congested =
=3D=3D 1) {=0A+ start_orf_token_delay_timeout =
(instance, instance->totem_config->token_timeout - 250);=0A+ =
} else {=0A+ complete_orf_token_processing =
(instance);=0A }=0A-=0A- token_callb=
acks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);=0A+ =
return (0);=0A }=0A break;=0A }=0A =0A- if =
((forward_token)=0A+ if ((instance->forward_token)=0A && =
instance->use_heartbeat) {=0A reset_heartbeat_timeout(instance);=
=0A }=0A@@ -3608,6 +3666,59 @@ printf ("token seq %d\n", token->seq);=
=0A }=0A =0A return (0);=0A+=0A+}=0A+=0A+static void complete_or=
f_token_processing (void *data)=0A+{=0A+ struct totemsrp_instance =
*instance =3D data;=0A+ struct orf_token *token =3D NULL;=0A+#ifdef =
GIVEINFO=0A+ unsigned long long tv_current;=0A+ unsigned long long =
tv_diff;=0A+#endif=0A+=0A+ token =3D (struct orf_token *)instance->orf=
_token_storage;=0A+=0A+ totemrrp_send_flush (instance->totemrrp_context);=
=0A+ token_send (instance, token);=0A+=0A+#ifdef GIVEINFO=0A+ =
tv_current =3D timerlist_nano_current_get ();=0A+ tv_diff =3D =
tv_current - tv_old;=0A+ tv_old =3D tv_current;=0A+ log_printf =
(instance->totemsrp_log_level_debug,=0A+ "I held %0.4f =
ms\n",=0A+ ((float)tv_diff) / 1000000.0);=0A+#endif=0A+ if =
(instance->memb_state =3D=3D MEMB_STATE_OPERATIONAL) {=0A+ =
messages_deliver_to_app (instance, 0,=0A+ instance->m=
y_high_seq_received);=0A+ }=0A+=0A+ /*=0A+ * Deliver =
messages after token has been transmitted=0A+ * to improve performance=
=0A+ */=0A+ reset_token_timeout (instance); // REVIEWED=0A+ reset_token=
_retransmit_timeout (instance); // REVIEWED=0A+ if (totemip_equal(&instance=
->my_id.addr[0], &instance->my_ring_id.rep) &&=0A+ instance->m=
y_token_held =3D=3D 1) {=0A+=0A+ start_token_hold_retransmit=
_timeout (instance);=0A+ }=0A+=0A+ token_callbacks_execute =
(instance, TOTEM_CALLBACK_TOKEN_SENT);=0A+=0A+=0A+ if ((instance->forw=
ard_token)=0A+ && instance->use_heartbeat) {=0A+ =
reset_heartbeat_timeout(instance);=0A+ }=0A+ else {=0A+ =
cancel_heartbeat_timeout(instance);=0A+ }=0A+ return;=0A }=0A =0A static =
void messages_deliver_to_app (=0Adiff --git a/include/corosync/coroipcs.h =
b/include/corosync/coroipcs.h=0Aindex 3838af8..9904609 100644=0A--- =
a/include/corosync/coroipcs.h=0A+++ b/include/corosync/coroipcs.h=0A@@ =
-116,6 +116,7 @@ struct coroipcs_init_state_v2 {=0A ...) =
__attribute__((format(printf, 5, 6)));=0A int log_subsys_id;=0A =
void (*stats_decrement_value) (hdb_handle_t handle, const char* name);=0A+ =
void (*ipc_queue_state_change) (int queue_state);=0A };=0A =0A extern void =
coroipcs_ipc_init (=0A-- =0A1.6.3.4=0A=0A
--=__Part9FB5CDB0.0__=--
More information about the Openais
mailing list