[Openais] evt update for retained event recovery on config change

Mark Haverkamp markh at osdl.org
Fri Sep 24 10:44:59 PDT 2004


Steve,

Here is an update to the evt code.  It contains configuration change
code to handle distributing retained events.  It is using the token
callback code.  It seems to handle config changes at this time. It
doesn't really handle merging of partitions that have operated
independently for a while, but Daniel and I have some ideas for this to
be added later.
It also has the start of the code to handle channel opens via exec
handlers. (Channel close, freeing data, etc. still TBD).

I changed the token callback handle to be an unsigned long since it is
really a pointer.  I don't think that an int is guaranteed to be pointer
size but an unsigned long is.

Mark.
-- 
Mark Haverkamp <markh at osdl.org>
-------------- next part --------------
===== exec/evt.c 1.5 vs edited =====
--- 1.5/exec/evt.c	2004-09-18 00:01:48 -07:00
+++ edited/exec/evt.c	2004-09-24 10:32:45 -07:00
@@ -34,7 +34,6 @@
 //#define DEBUG
 //#define EVT_EVENT_LIST_CHECK
 //#define EVT_ALLOC_CHECK
-//#define NO_DUPLICATES
 #include <sys/types.h>
 #include <malloc.h>
 #include <errno.h>
@@ -71,7 +70,7 @@
 static int lib_evt_event_data_get(struct conn_info *conn_info, 
 		void *message);
 static int evt_conf_change(
-		enum gmi_configuration_type configuration_type,	
+		enum gmi_configuration_type configuration_type,
 		struct sockaddr_in *member_list, int member_list_entries,
 		struct sockaddr_in *left_list, int left_list_entries,
 		struct sockaddr_in *joined_list, int joined_list_entries);
@@ -133,11 +132,13 @@
 
 	
 static int evt_remote_evt(void *msg, struct in_addr source_addr);
+static int evt_remote_recovery_evt(void *msg, struct in_addr source_addr);
 static int evt_remote_chan_op(void *msg, struct in_addr source_addr);
 
 static int (*evt_exec_handler_fns[]) (void *m, struct in_addr s) = {
 	evt_remote_evt,
-	evt_remote_chan_op
+	evt_remote_chan_op,
+	evt_remote_recovery_evt
 };
 
 struct service_handler evt_service_handler = {
@@ -172,6 +173,33 @@
  */
 static DECLARE_LIST_INIT(ci_head);
 
+/*
+ * Structure to track pending channel open requests.
+ * 	ocp_async:			1 for async open
+ * 	ocp_invocation:		invocation for async open
+ * 	ocp_chan_name:		requested channel
+ * 	ocp_conn_info:		conn_info for returning to the library.
+ * 	ocp_open_flags:		channel open flags
+ * 	ocp_timer_handle:	timer handle for sync open
+ * 	ocp_entry:			list entry for pending open list.
+ */
+struct open_chan_pending {
+	int					ocp_async;
+	SaInvocationT		ocp_invocation;
+	SaNameT				ocp_chan_name;
+	struct conn_info	*ocp_conn_info;
+	SaEvtChannelOpenFlagsT	ocp_open_flag;
+	poll_timer_handle	ocp_timer_handle;
+	uint32_t			ocp_c_handle;
+	struct list_head	ocp_entry;
+};
+	
+/*
+ * list of pending channel opens
+ */
+static DECLARE_LIST_INIT(open_pending);
+static void chan_open_timeout(void *data);
+
 #define min(a,b) ((a) < (b) ? (a) : (b))
 
 /*
@@ -320,13 +348,54 @@
  * mn_started:			Indicates that event service has started
  * 						on this node.
  * mn_next:				pointer to the next node in the hash chain.
+ * mn_entry:			List of all nodes.
  */
 struct member_node_data {
+	struct in_addr		mn_node_addr;
 	SaClmClusterNodeT	mn_node_info;
 	SaEvtEventIdT		mn_last_evt_id;
 	SaClmNodeIdT		mn_started;
 	struct member_node_data	*mn_next;
+	struct list_head	mn_entry;
 };
+DECLARE_LIST_INIT(mnd);
+
+/*
+ * Global varaibles used by the event service
+ *
+ * base_id:			Next event ID to assign
+ * my_node_id:		My cluster node id
+ * in_cfg_change:	Config change occurred.  Figure out who sends retained evts.
+ * 					cleared when retained events have been delivered.
+ * total_members:	how many members in this cluster
+ * checked_in:		keep track during config change.
+ * any_joined:		did any nodes join on this change?
+ * recovery_node:	True if we're the recovery node.
+ * tok_call_handle:	gmi token callback handle for recovery.
+ * next_retained:	pointer to next retained message to send during recovery.
+ *
+ */
+static SaEvtEventIdT base_id = 0;
+static SaClmNodeIdT  my_node_id = 0;
+static int			 in_cfg_change = 0;
+static int			 total_members = 0;
+static int 			 checked_in = 0;
+static int			 any_joined = 0;
+static int			 recovery_node = 0;
+static unsigned long tok_call_handle = 0;
+static struct list_head *next_retained = 0;
+
+/*
+ * Compare two names.  returns non-zero on match.
+ */
+static inline int name_match(SaNameT *n1, SaNameT *n2) 
+{
+	if (n1->length != n2->length) {
+		return 0;
+	}
+
+	return (memcmp(n1->value, n2->value, n1->length) == 0);
+}
 
 /*
  * Get the time of day and convert to nanoseconds
@@ -471,7 +540,6 @@
 			SaTimeT timeout, struct event_svr_channel_instance **eci,
 			struct libevt_ci *esip)
 {
-	struct event_svr_channel_instance *ecp;
 	struct req_evt_chan_command cpkt;
 	struct iovec chn_iovec;
 	int res;
@@ -494,22 +562,19 @@
 	 * it's an error since we're notified of channels being created and
 	 * opened.
 	 */
-	if (flgs & SA_EVT_CHANNEL_CREATE) {
-		*eci = create_channel(cn);
-		ecp = *eci;
-	} else {
+	if (!(flgs & SA_EVT_CHANNEL_CREATE)) {
 		ret = SA_ERR_NOT_EXIST;
 		goto chan_open_end;
 	}
 
 	/*
-	 * create the channel packet to send. Tell the rest of the cluster
-	 * that we've created the channel.
+	 * create the channel packet to send. Tell the the cluster
+	 * to create the channel.
 	 */
 	memset(&cpkt, 0, sizeof(cpkt));
 	cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
 	cpkt.chc_head.size = sizeof(cpkt);
-	cpkt.chc_op = MESSAGE_REQ_EVT_OPEN_CHANNEL;
+	cpkt.chc_op = EVT_OPEN_CHAN_OP;
 	cpkt.u.chc_chan = *cn;
 	chn_iovec.iov_base = &cpkt;
 	chn_iovec.iov_len = cpkt.chc_head.size;
@@ -523,36 +588,31 @@
 
 }
 
-#ifdef NO_DUPLICATES
 /*
- * Node data access functions.  Used to detect and filter duplicate
+ * Node data access functions.  Used to keep track of event IDs
  * delivery of messages.
  *
  * add_node: 	Add a new member node to our list.
  * remove_node:	Remove a node that left membership.
  * find_node:	Given the node ID return a pointer to node information.
  *
- * TODO: There is a problem when receiving config updates.  When we get the
- * TODO:	update, the cluster node table hasn't been updated yet and we
- * TODO:	can't find the node to put in this list.
- *
  */
 #define NODE_HASH_SIZE 256
 static struct member_node_data *nl[NODE_HASH_SIZE] = {0};
 inline int 
-hash_node_id(SaClmNodeIdT node_id)
+hash_sock_addr(struct in_addr addr)
 {
-	return node_id & (NODE_HASH_SIZE - 1);
+	return addr.s_addr & (NODE_HASH_SIZE - 1);
 }
 
-static struct member_node_data **lookup_node(SaClmNodeIdT node_id)
+static struct member_node_data **lookup_node(struct in_addr addr)
 {
-	int index = hash_node_id(node_id);
+	int index = hash_sock_addr(addr);
 	struct member_node_data **nlp;
 
 	nlp = &nl[index];
 	for (nlp = &nl[index]; *nlp; nlp = &((*nlp)->mn_next)) {
-		if ((*nlp)->mn_node_info.nodeId == node_id) {
+		if ((*nlp)->mn_node_addr.s_addr == addr.s_addr) {
 			break;
 		}
 	}
@@ -561,11 +621,11 @@
 }
 
 static struct member_node_data *
-evt_find_node(SaClmNodeIdT node_id)
+evt_find_node(struct in_addr addr)
 {
 	struct member_node_data **nlp;
 
-	nlp = lookup_node(node_id);
+	nlp = lookup_node(addr);
 
 	if (!nlp) {
 		log_printf(LOG_LEVEL_DEBUG, "find_node: Got NULL nlp?\n");
@@ -576,13 +636,13 @@
 }
 
 static SaErrorT
-evt_add_node(SaClmClusterNodeT *ni) 
+evt_add_node(struct in_addr addr, SaClmClusterNodeT *cn) 
 {
 	struct member_node_data **nlp;
 	struct member_node_data *nl;
 	SaErrorT err = SA_ERR_EXIST;
 
-	nlp = lookup_node(ni->nodeId);
+	nlp = lookup_node(addr);
 
 	if (!nlp) {
 		log_printf(LOG_LEVEL_DEBUG, "add_node: Got NULL nlp?\n");
@@ -601,27 +661,25 @@
 	if (nl) {
 		memset(nl, 0, sizeof(*nl));
 		err = SA_OK;
+		nl->mn_node_addr = addr;
+		nl->mn_started = 1;
 	}
-
-	nl->mn_node_info.nodeId = ni->nodeId;
-	nl->mn_node_info.nodeAddress = ni->nodeAddress;
-	nl->mn_node_info.nodeName = ni->nodeName;
-	nl->mn_node_info.clusterName = ni->clusterName;
-	nl->mn_node_info.member = ni->member;
-	nl->mn_node_info.bootTimestamp = ni->bootTimestamp;
+	list_init(&nl->mn_entry);
+	list_add(&nl->mn_entry, &mnd);
+	nl->mn_node_info = *cn;
 
 an_out:
 	return err;
 }
 
 static SaErrorT
-evt_remove_node(SaClmClusterNodeT *ni) 
+evt_remove_node(struct in_addr addr) 
 {
 	struct member_node_data **nlp;
 	struct member_node_data *nl;
 	SaErrorT err = SA_ERR_NOT_EXIST;
 
-	nlp = lookup_node(ni->nodeId);
+	nlp = lookup_node(addr);
 
 	if (!nlp) {
 		log_printf(LOG_LEVEL_DEBUG, "remove_node: Got NULL nlp?\n");
@@ -634,6 +692,7 @@
 
 	nl = *nlp;
 
+	list_del(&nl->mn_entry);
 	*nlp = nl->mn_next;
 	free(*nlp);
 	err = SA_OK;
@@ -641,92 +700,155 @@
 an_out:
 	return err;
 }
-#endif
+
+/*
+ * Find the oldest node in the membership.  This is the one we choose to 
+ * perform some cluster-wide functions like distributing retained events.
+ */
+static struct member_node_data* oldest_node()
+{
+	struct list_head *l;
+	struct member_node_data *mn = 0;
+	struct member_node_data *oldest = 0;
+
+	for (l = mnd.next; l != &mnd; l = l->next) {
+		mn = list_entry(l, struct member_node_data, mn_entry);
+		if (mn->mn_started == 0) {
+			continue;
+		}
+		if ((oldest == NULL) || 
+				(mn->mn_node_info.bootTimestamp <
+					 oldest->mn_node_info.bootTimestamp )) {
+			oldest = mn;
+		} else if (mn->mn_node_info.bootTimestamp ==
+					 oldest->mn_node_info.bootTimestamp) {
+			if (mn->mn_node_info.nodeId < oldest->mn_node_info.nodeId) {
+				oldest = mn;
+			}
+		}
+	}
+	return oldest;
+}
 
 
 /*
- * Send our retained events to the specified node id.
- * Called when a remote event server starts up and opens a channel
- * that has retained events that we published.
- *
- * TODO: Fill me in
+ * Token callback routine.  Send as many mcasts as possible to distribute
+ * retained events on a config change.
  */
-static void send_retained(SaNameT *cn, SaClmNodeIdT node_id)
+static int send_next_retained(void *data)
 {
-	log_printf(LOG_LEVEL_DEBUG, 
-					"TODO: Send retained messages for %s to 0x%x\n", 
-					cn->value, node_id);
+	struct req_evt_chan_command cpkt;
+	struct iovec chn_iovec;
+	struct event_data *evt;
+	int res;
+
+	if (in_cfg_change && recovery_node) {
+		/*
+		 * Process messages.  When we're done, send the done message
+		 * to the nodes.
+		 */
+		for (;next_retained != &retained_list; 
+								next_retained = next_retained->next) {
+			log_printf(LOG_LEVEL_DEBUG, "Sending next retained event\n");
+			evt = list_entry(next_retained, struct event_data, ed_retained);
+			evt->ed_event.led_head.id = MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA;
+			chn_iovec.iov_base = &evt->ed_event;
+			chn_iovec.iov_len = evt->ed_event.led_head.size;
+			res = gmi_mcast(&aisexec_groupname, &chn_iovec, 1, 
+				GMI_PRIO_RECOVERY);
+
+			if (res != 0) {
+			/*
+			 * Try again later.
+			 */
+				return -1;
+			}
+		}
+		log_printf(LOG_LEVEL_DEBUG, "DONE Sending retained events\n");
+		memset(&cpkt, 0, sizeof(cpkt));
+		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
+		cpkt.chc_head.size = sizeof(cpkt);
+		cpkt.chc_op = EVT_CONF_DONE;
+		chn_iovec.iov_base = &cpkt;
+		chn_iovec.iov_len = cpkt.chc_head.size;
+		res = gmi_mcast (&aisexec_groupname, &chn_iovec, 1, 
+												GMI_PRIO_RECOVERY);
+	}
+	tok_call_handle = 0;
+	return 0;
 }
 
 /*
- * purge retained events from the specified node id.
- * Called when a remote event server terminates.
- *
- * TODO: Fill me in
+ * Send our retained events. If we've been chosen as the recovery node, kick
+ * kick off the process of sending retained events.
  */
-static void purge_retained(SaClmNodeIdT node_id)
+static void send_retained()
 {
-	log_printf(LOG_LEVEL_DEBUG, "TODO: Purge retained messages for node 0x%x\n", node_id);
+	struct req_evt_chan_command cpkt;
+	struct iovec chn_iovec;
+	int res;
+
+	if (list_empty(&retained_list) || !any_joined) {
+		memset(&cpkt, 0, sizeof(cpkt));
+		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
+		cpkt.chc_head.size = sizeof(cpkt);
+		cpkt.chc_op = EVT_CONF_DONE;
+		chn_iovec.iov_base = &cpkt;
+		chn_iovec.iov_len = cpkt.chc_head.size;
+		log_printf(LOG_LEVEL_DEBUG, "No messages to send\n");
+		res = gmi_mcast (&aisexec_groupname, &chn_iovec, 1, 
+											GMI_PRIO_RECOVERY);
+	} else {
+		log_printf(LOG_LEVEL_DEBUG, 
+					"Start sending retained messages\n");
+		recovery_node = 1;
+		next_retained = retained_list.next;
+		res = gmi_token_callback_create(&tok_call_handle, send_next_retained,
+				NULL);
+	}
+	if (res != 0) {
+		log_printf(LOG_LEVEL_ERROR, "ERROR sending evt recovery data\n");
+	}
 }
 
-#ifdef NO_DUPLICATES
 /*
- * See if we've already seen a message with this ID from 
- * this node.  Return 0 for not seen, 1 for seen.
- * We also bump the last seen event for the next time. So only call this 
- * once per event being proccessed.
+ * keep track of the last event ID from a node.
+ * If we get an event ID less than our last, we've already
+ * seen it.  It's probably a retained event being sent to 
+ * a new node.
  */
-static int is_duplicate_event(struct lib_event_data *evtpkt, 
-				SaClmClusterNodeT *cn)
+static int check_last_event(struct lib_event_data *evtpkt, 
+				struct in_addr addr)
 {
 	struct member_node_data *nd;
+	SaClmClusterNodeT *cn;
 
 
-	/*
-	 * Look up the node and check the largest event ID that we've seen.
-	 * Since event IDs are increasing and are delivered in order from
-	 * a given publisher, we just need to check that this ID is
-	 * greater than the last one that we saw.
-	 */
-	nd = evt_find_node(evtpkt->led_publisher_node_id);
+	nd = evt_find_node(addr);
 	if (!nd) {
 		log_printf(LOG_LEVEL_DEBUG, "Node ID 0x%x not found for event %llx\n",
 				evtpkt->led_publisher_node_id, evtpkt->led_event_id);
-		evt_add_node(cn);
-		return 0;
+		cn = clm_get_by_nodeid(addr);
+		if (!cn) {
+			log_printf(LOG_LEVEL_DEBUG, 
+					"Cluster Node 0x%x not found for event %llx\n",
+				evtpkt->led_publisher_node_id, evtpkt->led_event_id);
+		} else {
+			evt_add_node(addr, cn);
+			nd = evt_find_node(addr);
+		}
 	}
 
-	/*
-	 * This shouldn't happen
-	 */
-	if ((nd->mn_last_evt_id >= evtpkt->led_event_id) && 
-						(evtpkt->led_event_id & 0xffffffffull) != 0ull) {
-		log_printf(LOG_LEVEL_NOTICE, 
-			"Event out of order for node ID 0x%x\n",
-				evtpkt->led_publisher_node_id);
-		log_printf(LOG_LEVEL_NOTICE, 
-			"last event ID 0x%llx, current event ID 0x%llx\n",
-				nd->mn_last_evt_id, evtpkt->led_event_id);
-		return 1;
+	if (!nd) {
+		return 0;
 	}
 
-	/*
-	 * This is probably OK, but here for debugging purposes
-	 */
-	if(((nd->mn_last_evt_id & 0xffffffff) > 0) && (nd->mn_last_evt_id < 
-										(evtpkt->led_event_id -1))) {
-		log_printf(LOG_LEVEL_NOTICE,
-			"Event sequence skipped for node ID 0x%x\n",
-			evtpkt->led_publisher_node_id);
-		log_printf(LOG_LEVEL_NOTICE, 
-			"last event ID 0x%llx, current event ID 0x%llx\n",
-				nd->mn_last_evt_id, evtpkt->led_event_id);
+	if ((nd->mn_last_evt_id < evtpkt->led_event_id)) {
+		nd->mn_last_evt_id = evtpkt->led_event_id;
+		return 0;
 	}
-	nd->mn_last_evt_id = evtpkt->led_event_id;
-
-	return 0;
+	return 1;
 }
-#endif
 
 /*
  * Send a message to the app to wake it up if it is polling
@@ -749,7 +871,6 @@
  * upper 32 bits of the event ID to make sure that we can generate a cluster
  * wide unique event ID for a given event.
  */
-static SaEvtEventIdT base_id = 0;
 SaErrorT set_event_id(SaClmNodeIdT node_id)
 {
 	SaErrorT err = SA_OK;
@@ -766,10 +887,12 @@
 	return SA_OK;
 }
 
-/*
+
+#ifdef EVT_ALLOC_CHECK
 static uint32_t evt_alloc = 0;
 static uint32_t evt_free = 0;
-*/
+#endif
+
 /*
  * Free up an event structure if it isn't being used anymore.
  */
@@ -788,7 +911,7 @@
 #ifdef EVT_ALLOC_CHECK
 	evt_free++;
 	if ((evt_free % 1000) == 0) {
-			log_printf(LOG_LEVEL_NOTICE, "evt alloc: %u, evt free: %u\n",
+			log_printf(LOG_LEVEL_NOTICE, "evt alloc: %u, *evt free: %u\n",
 							evt_alloc, evt_free);
 	}
 #endif
@@ -805,6 +928,15 @@
 	struct event_data *edp = data;
 	log_printf(LOG_LEVEL_DEBUG, "Event ID %llx expired\n", 
 					edp->ed_event.led_event_id);
+	/*
+	 * adjust next_retained if we're in recovery and 
+	 * were in charge of sending retained events.
+	 */
+	if (in_cfg_change && recovery_node) {
+		if (next_retained == &edp->ed_retained) {
+			next_retained = edp->ed_retained.next;
+		}
+	}
 	list_del(&edp->ed_retained);
 	list_init(&edp->ed_retained);
 	free_event_data(edp);
@@ -1202,13 +1334,13 @@
 	 * Queue the event for delivery
 	 */
 	if (do_deliver_event) {
-		evt->ed_ref_count++;
 		ep = malloc(sizeof(*ep));
 		if (!ep) {
 			log_printf(LOG_LEVEL_WARNING, 
 						"Memory allocation error, can't deliver event\n");
 			return;
 		}
+		evt->ed_ref_count++;
 		ep->cel_chan_handle = eco->eco_lib_handle;
 		ep->cel_sub_id = ecs->ecs_sub_id;
 		list_init(&ep->cel_entry);
@@ -1293,7 +1425,7 @@
 #ifdef EVT_ALLOC_CHECK
 	evt_alloc++;
 	if ((evt_alloc % 1000) == 0) {
-			log_printf(LOG_LEVEL_NOTICE, "evt alloc: %u, evt free: %u\n",
+			log_printf(LOG_LEVEL_NOTICE, "*evt alloc: %u, evt free: %u\n",
 							evt_alloc, evt_free);
 	}
 #endif
@@ -1416,6 +1548,9 @@
 	struct event_svr_channel_instance 	*eci;
 	struct event_svr_channel_open		*eco;
 	struct libevt_ci *esip = &conn_info->ais_ci.u.libevt_ci;
+	struct open_chan_pending *ocp;
+	int msec_in_future;
+	int ret;
 
 	req = message;
 
@@ -1430,28 +1565,72 @@
 			req->ico_channel_name.length,
 			req->ico_channel_name.value);
 	/*
-	 * Create a handle to give back to the caller to associate
-	 * with this channel open instance.
+	 * Open the channel.
+	 *
 	 */
-	error = saHandleCreate(&esip->esi_hdb, sizeof(*eco), &handle);
+	error = evt_open_channel(&req->ico_channel_name, 
+			req->ico_open_flag, req->ico_timeout, &eci, esip);
+
 	if (error != SA_OK) {
 		goto open_return;
 	}
-	error = saHandleInstanceGet(&esip->esi_hdb, handle, (void**)&eco);
-	if (error != SA_OK) {
+
+	/*
+	 * See if we found the channel.  If not, then the request is pending
+	 * and we set up for the command completion and just return.  The command
+	 * will be completed when the exec handler receives the open request, or
+	 * if the command times out.
+	 */
+	if (eci) {
+		goto found_channel;
+	}
+
+	ocp = malloc(sizeof(struct open_chan_pending));
+	if (!ocp) {
+		error = SA_ERR_NO_MEMORY;
 		goto open_return;
 	}
 
+	ocp->ocp_async = 0;
+	ocp->ocp_invocation = 0;
+	ocp->ocp_chan_name = req->ico_channel_name;
+	ocp->ocp_open_flag = req->ico_open_flag;
+	ocp->ocp_conn_info = conn_info;
+	ocp->ocp_timer_handle = 0;
+	list_init(&ocp->ocp_entry);
+	list_add_tail(&ocp->ocp_entry, &open_pending);
+	if (req->ico_timeout != 0) {
+		msec_in_future = (uint32_t)(req->ico_timeout / 1000000ULL);
+		ret = poll_timer_add(aisexec_poll_handle,
+				msec_in_future,
+				ocp,
+				chan_open_timeout,
+				&ocp->ocp_timer_handle);
+		if (ret != 0) {
+			log_printf(LOG_LEVEL_WARNING, 
+					"Error setting timeout for open channel %s\n",
+					req->ico_channel_name.value);
+		}
+	}
+	return 0;
+
+
 	/*
-	 * Open the channel.
-	 *
+	 * The channel already exists, we can return the information to the
+	 * library right away.
 	 */
-	error = evt_open_channel(&req->ico_channel_name, 
-			req->ico_open_flag, req->ico_timeout, &eci, esip);
-
+found_channel:
+	/*
+	 * Create a handle to give back to the caller to associate
+	 * with this channel open instance.
+	 */
+	error = saHandleCreate(&esip->esi_hdb, sizeof(*eco), &handle);
+	if (error != SA_OK) {
+		goto open_return;
+	}
+	error = saHandleInstanceGet(&esip->esi_hdb, handle, (void**)&eco);
 	if (error != SA_OK) {
-		saHandleDestroy(&esip->esi_hdb, handle);
-		goto open_put;
+		goto open_return;
 	}
 
 	/*
@@ -1472,7 +1651,6 @@
 	 * respond back with a handle to access this channel
 	 * open instance for later subscriptions, etc.
 	 */
-open_put:
 	saHandleInstancePut(&esip->esi_hdb, handle);
 open_return:
 	res.ico_head.size = sizeof(res);
@@ -1855,13 +2033,10 @@
 			req->iec_event_id,
 			req->iec_channel_handle);
 
-	/*
-	 * TODO: Add clear retention time code here
-	 */
 	memset(&cpkt, 0, sizeof(cpkt));
 	cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
 	cpkt.chc_head.size = sizeof(cpkt);
-	cpkt.chc_op = MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME;
+	cpkt.chc_op = EVT_CLEAR_RET_OP;
 	cpkt.u.chc_event_id = req->iec_event_id;
 	rtn_iovec.iov_base = &cpkt;
 	rtn_iovec.iov_len = cpkt.chc_head.size;
@@ -1950,18 +2125,21 @@
  * received for each node for the detection of duplicate events.
  */
 static int evt_conf_change(
-	enum gmi_configuration_type configuration_type,	
-	struct sockaddr_in *member_list, int member_list_entries,
-	struct sockaddr_in *left_list, int left_list_entries,
-	struct sockaddr_in *joined_list, int joined_list_entries)
+		enum gmi_configuration_type configuration_type,
+		struct sockaddr_in *member_list, int member_list_entries,
+		struct sockaddr_in *left_list, int left_list_entries,
+		struct sockaddr_in *joined_list, int joined_list_entries)
 {
 	struct in_addr my_node = {SA_CLM_LOCAL_NODE_ID};
 	SaClmClusterNodeT *cn;
-#ifdef NO_DUPLICATES
 	static int first = 1;
 	struct sockaddr_in *add_list;
-	SaErrorT error;
+	struct member_node_data *md;
 	int add_count;
+	struct req_evt_chan_command cpkt;
+	struct iovec chn_iovec;
+	int res;
+
 
 	log_printf(LOG_LEVEL_DEBUG, "Evt conf change\n");
 	log_printf(LOG_LEVEL_DEBUG, "m %d, j %d, l %d\n", 
@@ -1969,10 +2147,18 @@
 					joined_list_entries,
 					left_list_entries);
 	/*
+	 * Stop any recovery callbacks in progress.
+	 */
+	if (tok_call_handle) {
+		gmi_token_callback_destroy(tok_call_handle);
+		tok_call_handle = 0;
+	}
+
+	/*
 	 * Don't seem to be able to tell who joined if we're just coming up. Not all
 	 * nodes show up in the join list.  If this is the first time through,
 	 * choose the members list to use to add nodes, after that use the join
-	 * list.  ALways use the left list for removing nodes.
+	 * list.  Always use the left list for removing nodes.
 	 */
 	if (first) {
 			add_list = member_list;
@@ -1984,62 +2170,96 @@
 	}
 
 	while (add_count--) {
-			log_printf(LOG_LEVEL_DEBUG, 
-						"Look up Cluster node for %s\n",
+		/*
+		 * If we've seen this node before, send out the last event ID 
+		 * that we've seen from him.  He will set his base event ID to
+		 * the highest one seen.
+		 */
+		md = evt_find_node(add_list->sin_addr);
+		if (md != NULL) {
+			if (!md->mn_started) {
+				log_printf(LOG_LEVEL_DEBUG, "Send set evt ID %llx to %s\n",
+					md->mn_last_evt_id, inet_ntoa(add_list->sin_addr));
+				md->mn_started = 1;
+				memset(&cpkt, 0, sizeof(cpkt));
+				cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
+				cpkt.chc_head.size = sizeof(cpkt);
+				cpkt.chc_op = EVT_SET_ID_OP;
+				cpkt.u.chc_set_id.chc_addr = add_list->sin_addr;
+				cpkt.u.chc_set_id.chc_last_id = md->mn_last_evt_id;
+				chn_iovec.iov_base = &cpkt;
+				chn_iovec.iov_len = cpkt.chc_head.size;
+				res = gmi_mcast (&aisexec_groupname, &chn_iovec, 1, 
+														GMI_PRIO_RECOVERY);
+				if (res != 0) {
+					log_printf(LOG_LEVEL_WARNING, 
+						"Unable to send event id to %s\n", 
 						inet_ntoa(add_list->sin_addr));
-			cn = clm_get_by_nodeid(add_list->sin_addr);
-			if (!cn) {
-				log_printf(LOG_LEVEL_DEBUG, 
-							"No Cluster node found for %s\n",
-							inet_ntoa(add_list->sin_addr));
-			} else {
-				log_printf(LOG_LEVEL_DEBUG, "Adding node: %s(0x%x)\n",
-								cn->nodeName.value, cn->nodeId);
-				error = evt_add_node(cn);
-				if (error != SA_OK) {
-					log_printf(LOG_LEVEL_DEBUG, 
-						"Can't add Cluster node at %s\n",
-								inet_ntoa(add_list->sin_addr));
 				}
 			}
-			cn++;
+		}
+		add_list++;
 	}
 
 	while (left_list_entries--) {
+		md = evt_find_node(left_list->sin_addr);
+		if (md == 0) {
 			log_printf(LOG_LEVEL_DEBUG, 
-						"Look up Cluster node for %s\n",
-						inet_ntoa(left_list->sin_addr));
-			cn = clm_get_by_nodeid(left_list->sin_addr);
-			if (!cn) {
-				log_printf(LOG_LEVEL_DEBUG, 
-					"No Cluster node found for %s\n",
-						inet_ntoa(left_list->sin_addr));
-			} else {
-				log_printf(LOG_LEVEL_DEBUG, "Removing node: %s(0x%x)\n",
-								cn->nodeName.value, cn->nodeId);
-				error = evt_remove_node(cn);
-				if (error != SA_OK) {
-					log_printf(LOG_LEVEL_DEBUG, 
-						"Can't add Cluster node at %s\n",
-								inet_ntoa(left_list->sin_addr));
-				}
-			}
-			cn++;
+					"Can't find cluster node at %s\n",
+							inet_ntoa(left_list->sin_addr));
+		/*
+		 * Mark this one as down.
+		 */
+		} else {
+			log_printf(LOG_LEVEL_DEBUG, "cluster node at %s down\n",
+							inet_ntoa(left_list->sin_addr));
+			md->mn_started = 0;
+		}
+		left_list++;
 	}
-#endif
 
 	/*
 	 * Set the base event id
 	 */
+	cn = clm_get_by_nodeid(my_node);
 	if (!base_id) {
-		cn = clm_get_by_nodeid(my_node);
-		log_printf(LOG_LEVEL_DEBUG, "My node ID 0x%x\n");
-		set_event_id(cn->nodeId);
+		log_printf(LOG_LEVEL_DEBUG, "My node ID 0x%x\n", cn->nodeId);
+		my_node_id = cn->nodeId;
+		set_event_id(my_node_id);
 	}
 
+
+	/*
+	 * Notify that a config change happened.  The exec handler will
+	 * then determine what to do.
+	 */
 	if (configuration_type == GMI_CONFIGURATION_REGULAR) {
-		gmi_recovery_plug_unplug (evt_recovery_plug_handle);
+		if (in_cfg_change) {
+			log_printf(LOG_LEVEL_DEBUG, 
+					"Already in config change, Starting over, m %d, c %d\n",
+					total_members, checked_in);
+		}
+
+		in_cfg_change = 1;
+		total_members = member_list_entries;
+		checked_in = 0;
+		any_joined = joined_list_entries;
+
+		log_printf(LOG_LEVEL_DEBUG, "Send EVT_CONF_CHANGE\n");
+		memset(&cpkt, 0, sizeof(cpkt));
+		cpkt.chc_head.id = MESSAGE_REQ_EXEC_EVT_CHANCMD;
+		cpkt.chc_head.size = sizeof(cpkt);
+		cpkt.chc_op = EVT_CONF_CHANGE;
+		chn_iovec.iov_base = &cpkt;
+		chn_iovec.iov_len = cpkt.chc_head.size;
+		res = gmi_mcast (&aisexec_groupname, &chn_iovec, 1, GMI_PRIO_RECOVERY);
+		if (res != 0) {
+			log_printf(LOG_LEVEL_WARNING, 
+				"Unable to send config change notice %d\n");
+				
+		}
 	}
+
 	return 0;
 }
 
@@ -2098,17 +2318,16 @@
  */
 static int evt_exec_init(void)
 {
-
-    int res;
-
+	int res;
 	log_printf(LOG_LEVEL_DEBUG, "Evt exec init request\n");
+
 	res = gmi_recovery_plug_create (&evt_recovery_plug_handle);
 	if (res != 0) {
 		log_printf(LOG_LEVEL_ERROR,
 			"Could not create recovery plug for event service.\n");
 		return (-1);
 	}
-	log_printf(LOG_LEVEL_DEBUG, "Evt exec init request\n"); 
+
 	/*
 	 * Create an event to be sent when we have to drop messages
 	 * for an application.
@@ -2134,7 +2353,6 @@
 	dropped_event->ed_event.led_patterns_number = 1;
 	memcpy(&dropped_event->ed_event.led_body[0], 
 					&dropped_pattern, sizeof(dropped_pattern));
-
 	return 0;
 }
 
@@ -2169,7 +2387,7 @@
 	cn = clm_get_by_nodeid (source_addr);
 	if (!cn) {
 			/*
-			 * TODO: do something here when we can't find the node.
+			 * Not sure how this can happen...
 			 */
 			log_printf(LOG_LEVEL_DEBUG, "No cluster node for %s\n",
 							inet_ntoa(source_addr));
@@ -2179,26 +2397,32 @@
 	log_printf(LOG_LEVEL_DEBUG, "Cluster node ID 0x%x name %s\n",
 					cn->nodeId, cn->nodeName.value);
 	evtpkt->led_publisher_node_id = cn->nodeId;
+	evtpkt->led_in_addr = source_addr;
+	evtpkt->led_receive_time = clustTimeNow();
 
 	eci = find_channel(&evtpkt->led_chan_name);
 
 	/*
-	 * No one here has this channel open yet.  We can ignore the
-	 * message.  When someone does open the channel, any retained messages
-	 * will be sent by the originators.
+	 * If we don't kmow about the channel, then no one has opened it yet.
+	 * We create the channel if there is a retention time, otherwise we can
+	 * just throw it away since no one here is looking for this event.
 	 */
 	if (!eci) {
-		return 0;
+		if (evtpkt->led_retention_time) {
+			eci = create_channel(&evtpkt->led_chan_name);
+			if (!eci) {
+				log_printf(LOG_LEVEL_WARNING, "Can't create channel %s\n",
+					evtpkt->led_chan_name.value);
+					
+			}
+		} else {
+			return 0;
+		}
 	}
 
-#ifdef NO_DUPLICATES
-	/*
-	 * Check for duplicate receipt of message
-	 */
-	if (is_duplicate_event(evtpkt, cn)) {
+	if (check_last_event(evtpkt, source_addr)) {
 		return 0;
 	}
-#endif
 
 	evt = make_local_event(evtpkt, eci);
 	if (!evt) {
@@ -2247,6 +2471,202 @@
 }
 
 /*
+ * Calculate the remaining retention time of a received event during recovery
+ */
+inline SaTimeT calc_retention_time(SaTimeT retention, 
+								SaTimeT received, SaTimeT now)
+{
+	if ((received < now) && ((now - received) < retention)) {
+		return retention - (now - received);
+	} else {
+		return 0;
+	}
+}
+
+/*
+ * Receive a recovery network event message and save it in the retained list
+ */
+static int evt_remote_recovery_evt(void *msg, struct in_addr source_addr)
+{
+	/*
+	 * - retain events that have a retention time
+	 * - Find assocated channel
+	 */
+	struct lib_event_data *evtpkt = msg;
+	struct event_svr_channel_instance *eci;
+	struct event_data *evt;
+	struct member_node_data *md;
+	SaTimeT now;
+
+	now = clustTimeNow();
+
+	log_printf(LOG_LEVEL_DEBUG, 
+			"Remote recovery event data received from 0x08%x\n",
+					source_addr);
+
+	if (!in_cfg_change) {
+		log_printf(LOG_LEVEL_NOTICE, 
+				"Received recovery data, not in recovery mode\n");
+		return 0;
+	}
+
+	log_printf(LOG_LEVEL_DEBUG, "Processing recovery of retained events\n");
+	if (recovery_node) {
+		log_printf(LOG_LEVEL_DEBUG, "This node is the recovery node\n");
+	}
+
+	log_printf(LOG_LEVEL_DEBUG, "(1)EVT ID: %llx, Time: %llx\n",
+			evtpkt->led_event_id, evtpkt->led_retention_time);
+	/*
+	 * Calculate remaining retention time
+	 */
+	evtpkt->led_retention_time = calc_retention_time(
+				evtpkt->led_retention_time, 
+				evtpkt->led_receive_time, 
+				now);
+
+	log_printf(LOG_LEVEL_DEBUG, 
+			"(2)EVT ID: %llx, ret: %llx, rec: %llx, now: %llx\n",
+			evtpkt->led_event_id, 
+			evtpkt->led_retention_time, evtpkt->led_receive_time, now);
+
+	/*
+	 * If we haven't seen this event yet and it has remaining time, process
+	 * the event.
+	 */
+	if (!check_last_event(evtpkt, evtpkt->led_in_addr) && 
+												evtpkt->led_retention_time) {
+		/*
+		 * See where the message came from so that we can set the 
+		 * publishing node id in the message before delivery.
+		 */
+		md = evt_find_node(evtpkt->led_in_addr);
+		if (!md) {
+				/*
+				 * Not sure how this can happen
+				 */
+				log_printf(LOG_LEVEL_DEBUG, "No node for %s\n",
+								inet_ntoa(evtpkt->led_in_addr));
+				errno = ENXIO;
+				return -1;
+		}
+		log_printf(LOG_LEVEL_DEBUG, "Cluster node ID 0x%x name %s\n",
+						md->mn_node_info.nodeId, 
+						md->mn_node_info.nodeName.value);
+
+		eci = find_channel(&evtpkt->led_chan_name);
+
+		/*
+		 * If the channel doesn't exist, then create it 
+		 * since we're in recovery mode, so that we can save this message.
+		 */
+		if (!eci) {
+			eci = create_channel(&evtpkt->led_chan_name);
+			if (!eci) {
+				log_printf(LOG_LEVEL_WARNING, "Can't create channel %s\n",
+					evtpkt->led_chan_name.value);
+					
+				return 0;
+			}
+		}
+
+		evt = make_local_event(evtpkt, eci);
+		if (!evt) {
+			log_printf(LOG_LEVEL_WARNING, 
+							"Memory allocation error, can't deliver event\n");
+			errno = ENOMEM;
+			return -1;
+		}
+			
+		retain_event(evt);
+		free_event_data(evt);
+	}
+
+	return 0;
+}
+
+
+/*
+ * Timeout handler for event channel open.
+ */
+static void chan_open_timeout(void *data)
+{
+	struct open_chan_pending *ocp = (struct open_chan_pending *)data;
+	struct res_evt_channel_open res;
+	
+	res.ico_head.size = sizeof(res);
+	res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL;
+	res.ico_head.error = SA_ERR_TIMEOUT;
+	libais_send_response (ocp->ocp_conn_info, &res, sizeof(res));
+	list_del(&ocp->ocp_entry);
+	free(ocp);
+}
+
+static void evt_chan_open_finish(struct open_chan_pending *ocp, 
+		struct event_svr_channel_instance *eci)
+{
+	uint32_t handle;
+	struct res_evt_channel_open res;
+	struct event_svr_channel_open *eco;
+	SaErrorT error;
+	struct libevt_ci *esip = &ocp->ocp_conn_info->ais_ci.u.libevt_ci;
+	int ret = 0;
+
+	if (!ocp->ocp_async && ocp->ocp_timer_handle) {
+		ret = poll_timer_delete(aisexec_poll_handle, ocp->ocp_timer_handle);
+		if (ret != 0 ) {
+			log_printf(LOG_LEVEL_WARNING, 
+				"Error clearing timeout for open channel of %s\n",
+							ocp->ocp_chan_name);
+		}
+	}
+
+	/*
+	 * Create a handle to give back to the caller to associate
+	 * with this channel open instance.
+	 */
+	error = saHandleCreate(&esip->esi_hdb, sizeof(*eco), &handle);
+	if (error != SA_OK) {
+		goto open_return;
+	}
+	error = saHandleInstanceGet(&esip->esi_hdb, handle, (void**)&eco);
+	if (error != SA_OK) {
+		goto open_return;
+	}
+
+	/*
+	 * Initailize and link into the global channel structure.
+	 */
+	list_init(&eco->eco_subscr);
+	list_init(&eco->eco_entry);
+	list_init(&eco->eco_instance_entry);
+	eco->eco_flags = ocp->ocp_open_flag;
+	eco->eco_channel = eci;
+	eco->eco_lib_handle = ocp->ocp_c_handle;
+	eco->eco_my_handle = handle;
+	eco->eco_conn_info = ocp->ocp_conn_info;
+	list_add_tail(&eco->eco_entry, &eci->esc_open_chans);
+	list_add_tail(&eco->eco_instance_entry, &esip->esi_open_chans);
+
+	/*
+	 * respond back with a handle to access this channel
+	 * open instance for later subscriptions, etc.
+	 */
+	saHandleInstancePut(&esip->esi_hdb, handle);
+open_return:
+	res.ico_head.size = sizeof(res);
+	res.ico_head.id = MESSAGE_RES_EVT_OPEN_CHANNEL;
+	res.ico_head.error = error;
+	res.ico_channel_handle = handle;
+	libais_send_response (ocp->ocp_conn_info, &res, sizeof(res));
+
+	if (ret == 0) {
+		list_del(&ocp->ocp_entry);
+		free(ocp);
+	}
+}
+
+/*
  * Receive and process remote event operations.
  * Used to communicate channel opens/closes, clear retention time.
  */
@@ -2255,50 +2675,145 @@
 	struct req_evt_chan_command *cpkt = msg;
 	struct in_addr local_node = {SA_CLM_LOCAL_NODE_ID};
 	SaClmClusterNodeT *cn, *my_node;
+	struct member_node_data *mn;
 	struct event_svr_channel_instance *eci;
 
 
 	log_printf(LOG_LEVEL_DEBUG, "Remote channel operation request\n");
 	my_node = clm_get_by_nodeid(local_node);
-	cn = clm_get_by_nodeid(source_addr);
 
-	/* 
-	 * can ignore messages from me.
-	if (my_node->nodeId == cn->nodeId) {
-			return 0;
+	mn = evt_find_node(source_addr);
+	if (mn == NULL) {
+		cn = clm_get_by_nodeid(source_addr);
+		if (cn == NULL) {
+			log_printf(LOG_LEVEL_WARNING, 
+					"Evt remote channel op: Node data for addr %x is NULL\n",
+					source_addr.s_addr);
+		} else {
+			evt_add_node(source_addr, cn);
+			mn = evt_find_node(source_addr);
+		}
 	}
-	 */
 
 	switch (cpkt->chc_op) {
-	case MESSAGE_REQ_EVT_OPEN_CHANNEL:
+		/*
+		 * Open channel remote command.  The open channel request may be done
+		 * in two steps.  If we don't already know about a channel when an 
+		 * application try to open, we send an open channel message to the
+		 * other nodes. When we receive an open channel message, we may create
+		 * the channel structure if it doesn't exist and also complete any
+		 * applicaiton open requests for the specified channel.
+		 */
+	case EVT_OPEN_CHAN_OP: {
+		struct open_chan_pending *ocp;
+		struct list_head *l, *nxt;
+
 		log_printf(LOG_LEVEL_DEBUG, "Creating channel %s for node 0x%x\n",
-						cpkt->u.chc_chan.value, cn->nodeId);
+						cpkt->u.chc_chan.value, mn->mn_node_info.nodeId);
 		eci = find_channel(&cpkt->u.chc_chan);
 
-		/*
-		 * If found, either there was a race opening a channel or
-		 * a node joined after a channel was created.  We need to send
-		 * him our retained messages to bring him up to date.
-		 */
-		if (eci) {
-			send_retained(&cpkt->u.chc_chan, cn->nodeId);
+		if (!eci) {
+			eci = create_channel(&cpkt->u.chc_chan);
+		}
+		if (!eci) {
+			log_printf(LOG_LEVEL_WARNING, "Could not create channel %s\n",
+							&cpkt->u.chc_chan.value);
 			break;
 		}
 
-		eci = create_channel(&cpkt->u.chc_chan);
-		if (!eci) {
-				log_printf(LOG_LEVEL_WARNING, "Could not create channel %s\n",
-								&cpkt->u.chc_chan.value);
+		/*
+		 * Complete any pending open requests.
+		 */
+		for (l = open_pending.next; l != &open_pending; l = nxt) {
+			nxt = l->next;
+			ocp = list_entry(l, struct open_chan_pending, ocp_entry);
+			if (name_match(&ocp->ocp_chan_name, &eci->esc_channel_name)) {
+				evt_chan_open_finish(ocp, eci);
+			}
 		}
 
 		break;
-	case MESSAGE_REQ_EVT_CLOSE_CHANNEL:
+	}
+
+	/*
+	 * Handle a channel close.
+	 */
+	case EVT_CLOSE_CHAN_OP:
 		break;
-	case MESSAGE_REQ_EVT_CLEAR_RETENTIONTIME:
+
+	/*
+	 * saEvtClearRetentiotime handler.
+	 */
+	case EVT_CLEAR_RET_OP:
 		log_printf(LOG_LEVEL_DEBUG, "Clear retention time request %llx\n",
 				cpkt->u.chc_event_id);	
 		clear_retention_time(cpkt->u.chc_event_id);
 		break;
+	
+	/*
+	 * Set our next event ID based on the largest event ID seen
+	 * by others in the cluster.  This way, if we've left and re-joined, we'll
+	 * start using an event ID that is unique.
+	 */
+	case EVT_SET_ID_OP: {
+		struct in_addr my_addr;
+		my_addr.s_addr = my_node->nodeId;
+		log_printf(LOG_LEVEL_DEBUG, 
+			"Received Set event ID OP from %x to %llx for %x my addr %x base %llx\n",
+					source_addr.s_addr, 
+					cpkt->u.chc_set_id.chc_last_id,
+					cpkt->u.chc_set_id.chc_addr.s_addr,
+					my_addr.s_addr,
+					base_id);	
+		if (cpkt->u.chc_set_id.chc_addr.s_addr == my_addr.s_addr) {
+			if (cpkt->u.chc_set_id.chc_last_id > base_id) {
+				log_printf(LOG_LEVEL_DEBUG, "Set event ID from %s to %llx\n",
+					inet_ntoa(source_addr), cpkt->u.chc_set_id.chc_last_id);	
+				base_id = cpkt->u.chc_set_id.chc_last_id + 1;
+			}
+		}
+		break;
+	}
+
+	/*
+	 * Process a config change.  Once we get all the messages from
+	 * the current membership, determine who delivers any retained events.
+	 */
+	case EVT_CONF_CHANGE: {
+		log_printf(LOG_LEVEL_DEBUG, 
+			"Receive EVT_CONF_CHANGE from %x members %d checked in %d, mn %x\n",
+				source_addr.s_addr, total_members, checked_in, mn);
+		if (mn) {
+		} else {
+			log_printf(LOG_LEVEL_WARNING, "NO NODE DATA AVAILABLE FOR %x\n",
+					source_addr.s_addr);
+		}
+		if (++checked_in == total_members) {
+			/*
+			 * We're all here, now figure out who should send the
+			 * retained events, if any.
+			 */
+			mn = oldest_node();
+			if (mn->mn_node_info.nodeId == my_node_id) {
+				log_printf(LOG_LEVEL_DEBUG, "I am oldest\n");
+				send_retained();
+			}
+			
+		}
+		break;
+	}
+
+	/*
+	 * OK, We're done with recovery, continue operations.
+	 */
+	case EVT_CONF_DONE: {
+		log_printf(LOG_LEVEL_DEBUG, 
+				"Receive EVT_CONF_DONE from %x\n", source_addr.s_addr);
+		in_cfg_change = 0;
+		gmi_recovery_plug_unplug (evt_recovery_plug_handle);
+		break;
+	}
+
 	default:
 		log_printf(LOG_LEVEL_NOTICE, "Invalid channel operation %d\n",
 						cpkt->chc_op);
===== exec/gmi.c 1.34 vs edited =====
--- 1.34/exec/gmi.c	2004-09-20 17:56:47 -07:00
+++ edited/exec/gmi.c	2004-09-24 10:27:38 -07:00
@@ -157,6 +157,12 @@
 	int refcount;
 };
 
+struct token_callback_instance {
+	struct list_head list;
+	int (*callback_fn) (void *);
+	void *data;
+};
+
 /*
  * In-order pending delivery queue
  */
@@ -235,6 +241,7 @@
 };
 
 DECLARE_LIST_INIT (plug_listhead);
+DECLARE_LIST_INIT (token_callback_listhead);
 
 /*
  * Delivered up to and including
@@ -537,6 +544,15 @@
 }
 #endif
 
+int callback_print (void *data) {
+int dval = (unsigned long)data;
+unsigned long handle;
+	printf ("data is %lx\n", (unsigned long)data);
+	dval++;
+	gmi_token_callback_create (&handle, callback_print, (void *)dval);
+	return 0;
+}
+
 /*
  * Exported interfaces
  */
@@ -552,6 +568,9 @@
 	int res;
 	int interface_no;
 
+//unsigned long handle;
+//gmi_token_callback_create (&handle, callback_print, 0);
+
 	/*
 	 * Initialize random number generator for later use to generate salt
 	 */
@@ -989,6 +1008,10 @@
 		assert (iovec[i].iov_len < MESSAGE_SIZE_MAX);
 	}
 
+	if (!gmi_send_ok (priority, total_size)) {
+			return -1;
+	}
+
 	packet_count = (total_size / packet_size);
 
 	gmi_log_printf (gmi_log_level_debug, "Message size is %d\n", total_size);
@@ -2901,6 +2924,58 @@
 	last_group_arut = orf_token->group_arut;
 }
 
+int gmi_token_callback_create (unsigned long *handle_out, 
+		int (*callback_fn) (void *), void *data)
+{
+	struct token_callback_instance *handle;
+	handle = (struct token_callback_instance *)malloc (sizeof (struct token_callback_instance));
+	if (handle == 0) {
+		return (-1);
+	}
+	*handle_out = (unsigned long)handle;
+	list_init (&handle->list);
+	handle->callback_fn = callback_fn;
+	handle->data = data;
+	list_add (&handle->list, &token_callback_listhead);
+	return (0);
+}
+
+void gmi_token_callback_destroy (unsigned long handle)
+{
+	struct token_callback_instance *token_callback_instance = 
+						(struct token_callback_instance *)handle;
+
+	list_del (&token_callback_instance->list);
+	free (token_callback_instance);
+}
+
+void token_callbacks_execute (void)
+{
+	struct list_head *list;
+	struct list_head *list_next;
+	struct token_callback_instance *token_callback_instance;
+	int res;
+
+	for (list = token_callback_listhead.next; list != &token_callback_listhead;
+		list = list_next) {
+
+		token_callback_instance = list_entry (list, 
+								struct token_callback_instance, list);
+		list_next = list->next;
+		list_del (list);
+		res = token_callback_instance->callback_fn (
+								token_callback_instance->data);
+		/*
+		 * This callback failed to execute, try it again on the next token
+		 */
+		if (res == -1) {
+			list_add (list, &token_callback_listhead);
+			break;
+		}
+		free (token_callback_instance);
+	}
+}
+
 /*
  * Message Handlers
  */
@@ -3069,6 +3144,8 @@
 	 * Transmit orf_token to next member
 	 */
 	orf_token_send (orf_token, rtr_list, plug_bitmap, 1);
+
+	token_callbacks_execute ();
 
 	return (0);
 }
===== exec/gmi.h 1.8 vs edited =====
--- 1.8/exec/gmi.h	2004-09-20 15:18:16 -07:00
+++ edited/exec/gmi.h	2004-09-24 10:18:45 -07:00
@@ -141,4 +141,12 @@
 int gmi_recovery_plug_unplug (
 	gmi_recovery_plug_handle handle_recovery);
 
+int gmi_token_callback_create (
+	unsigned long *handle_out,
+	int (*callback_fn) (void *),
+	void *data);
+
+void gmi_token_callback_destroy (
+	unsigned long handle);
+
 #endif /* GMI_H_DEFINED */
===== include/ais_msg.h 1.9 vs edited =====
--- 1.9/include/ais_msg.h	2004-09-23 15:10:06 -07:00
+++ edited/include/ais_msg.h	2004-09-24 10:25:57 -07:00
@@ -201,7 +201,8 @@
 	MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE,
 	MESSAGE_REQ_EXEC_CKPT_SECTIONREAD,
 	MESSAGE_REQ_EXEC_EVT_EVENTDATA,
-	MESSAGE_REQ_EXEC_EVT_CHANCMD
+	MESSAGE_REQ_EXEC_EVT_CHANCMD,
+	MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA
 };
 
 enum req_evt_types {
@@ -998,12 +999,15 @@
  * MESSAGE_REQ_EVT_PUBLISH			(1)
  * MESSAGE_RES_EVT_EVENT_DATA		(2)
  * MESSAGE_REQ_EXEC_EVT_EVENTDATA	(3)
+ * MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA	(4)
  *
  * led_head:				Request/Results head
+ * led_in_addr:				address of node (4 only)
+ * led_receive_time:		Time that the message was received (4 only)
  * led_svr_channel_handle:	Server channel handle (1 only)
  * led_lib_channel_handle:	Lib channel handle (2 only)
- * led_chan_name:			Channel name (3 only)
- * led_event_id:			Event ID (2 and 3 only)
+ * led_chan_name:			Channel name (3 and 4 only)
+ * led_event_id:			Event ID (2, 3 and 4 only)
  * led_sub_id:				Subscription ID (2 only)
  * led_publisher_node_id:	Node ID of event publisher
  * led_publisher_name:		Node name of event publisher
@@ -1017,6 +1021,8 @@
  */
 struct lib_event_data {
 	struct res_header		led_head;
+	struct in_addr			led_in_addr;
+	SaTimeT					led_receive_time;
 	uint32_t				led_svr_channel_handle;
 	uint32_t				led_lib_channel_handle;
 	SaNameT					led_chan_name;
@@ -1045,7 +1051,7 @@
 struct res_evt_event_publish {
 
 	struct res_header	iep_head;
-	SaEvtEventIdT			iep_event_id;
+	SaEvtEventIdT		iep_event_id;
 };
 
 /*
@@ -1061,8 +1067,8 @@
 struct req_evt_event_clear_retentiontime {
 
 	struct req_header	iec_head;
-	uint64_t				iec_event_id;
-	uint32_t				iec_channel_handle;
+	SaEvtEventIdT		iec_event_id;
+	uint32_t			iec_channel_handle;
 
 };
 
@@ -1086,12 +1092,27 @@
  * chc_op:		Channel operation (open, close, clear retentiontime)
  */
 
+enum evt_chan_ops {
+	EVT_OPEN_CHAN_OP,
+	EVT_CLOSE_CHAN_OP,
+	EVT_CLEAR_RET_OP,
+	EVT_SET_ID_OP,
+	EVT_CONF_CHANGE,
+	EVT_CONF_DONE,
+};
+	
+struct evt_set_id {
+	struct in_addr	chc_addr;
+	SaEvtEventIdT	chc_last_id;
+};
+
 struct req_evt_chan_command {
 	struct req_header 	chc_head;
 	int 				chc_op;
 	union {
-		SaNameT			chc_chan;
-		SaEvtEventIdT	chc_event_id;
+		SaNameT				chc_chan;
+		SaEvtEventIdT		chc_event_id;
+		struct evt_set_id	chc_set_id;
 	} u;
 };
 #endif /* AIS_MSG_H_DEFINED */
===== lib/evt.c 1.5 vs edited =====
--- 1.5/lib/evt.c	2004-09-02 10:16:24 -07:00
+++ edited/lib/evt.c	2004-09-07 10:45:47 -07:00
@@ -1156,11 +1156,6 @@
 	pthread_mutex_lock(&edi->edi_mutex);
 
 	/*
-	 * TODO: Check to make sure that the corresponding channel handle
-	 * TODO:	is still valid (i.e. open)
-	 */
-	
-	/*
 	 * Go through the args and send back information if the pointer
 	 * isn't NULL
 	 */
@@ -1243,11 +1238,6 @@
 	}
 	pthread_mutex_lock(&edi->edi_mutex);
 
-	/*
-	 * TODO: Check to make sure that the corresponding channel handle
-	 * TODO:	is still valid (i.e. open)
-	 */
-
 	if (edi->edi_event_data && edi->edi_event_data_size) {
 		xfsize = min(*event_data_size, edi->edi_event_data_size);
 		*event_data_size = edi->edi_event_data_size;
@@ -1586,12 +1576,6 @@
 	 * calculate size needed to store the filters
 	 */
 	sz = filt_size(filters);
-
-	/*
-	 * TODO: Check to make sure that no filter string exceeds
-	 * TODO:	the maximum allowed by the specification
-	 */
-
 
 	req = malloc(sizeof(*req) + sz);
 	
===== test/subscription.c 1.1 vs edited =====
--- 1.1/test/subscription.c	2004-09-02 10:16:25 -07:00
+++ edited/test/subscription.c	2004-09-23 12:34:31 -07:00
@@ -15,6 +15,8 @@
 #include "ais_evt.h"
 
 #define  TEST_EVENT_ORDER 1
+#define  EVT_FREQ 1000
+uint32_t evt_count = 0;
 
 extern int get_sa_error(SaErrorT, char *, int);
 char result_buf[256];
@@ -260,12 +262,15 @@
 	}
 	if (evt_pat_get_array.patternsNumber > 0) {
 		if (strcmp(evt_pat_get_array.patterns[0].pattern, SA_EVT_LOST_EVENT) == 0) {
-			printf("*** Events have been dropped at %s\n",
+			printf("*** Events have been dropped at %s",
 				ais_time_str(publish_time));
 		}
 	}
 	if (quiet < 2) {
 		printf("event id: 0x%016llx\n", event_id);
+	}
+	if (quiet == 2) {
+		if ((++evt_count % EVT_FREQ) == 0) fprintf(stderr, ".");
 	}
 
 #ifdef TEST_EVENT_ORDER


More information about the Openais mailing list