[Openais] [PATCH] Message Service - Queue Group Tracking

Ryan O'Hara rohara at redhat.com
Mon Jul 14 12:35:02 PDT 2008


Attached is a patch which adds the ability to track changes to queue
groups. Please refer to the Message Service specifaction for more
information about these API calls.

This patch includes changes to saMsgQueueGroupInsert and
saMsgQueueGroupRemove which allow for notification via callback when
membership changes occur for a given queue group.

Comments appreciated.

Ryan

-------------- next part --------------
Index: include/ipc_msg.h
===================================================================
--- include/ipc_msg.h	(revision 1579)
+++ include/ipc_msg.h	(working copy)
@@ -178,10 +178,14 @@
 	mar_req_header_t header;
 	SaNameT queueGroupName;
 	SaUint8T trackFlags;
+	SaUint8T bufferFlag;
 };
 
 struct res_lib_msg_queuegrouptrack {
 	mar_res_header_t header;
+	SaNameT queueGroupName;
+	SaUint32T numberOfMembers;
+	SaMsgQueueGroupNotificationBufferT notificationBuffer;
 };
 
 struct req_lib_msg_queuegrouptrackstop {
Index: exec/msg.c
===================================================================
--- exec/msg.c	(revision 1579)
+++ exec/msg.c	(working copy)
@@ -101,16 +101,18 @@
 
 struct queue_group {
 	SaNameT name;
+	SaUint8T track_flags;
+	SaMsgQueueGroupPolicyT policy;
 	struct list_head list;
 	struct list_head message_queue_head;
-};	
+};
 
 struct queue_group_entry {
+	SaMsgQueueGroupChangesT change;
 	struct message_queue *message_queue;
 	struct list_head list;
 };
 
-
 /*
 struct queue_cleanup {
 	struct message_queue *queue;
@@ -540,6 +542,7 @@
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_group_name;
+	SaMsgQueueGroupPolicyT policy;
 };
 
 struct req_exec_msg_queuegroupinsert {
@@ -566,6 +569,8 @@
 	mar_req_header_t header;
 	mar_message_source_t source;
 	SaNameT queue_group_name;
+	SaUint8T track_flags;
+	SaUint8T buffer_flag;
 };
 
 struct req_exec_msg_queuegrouptrackstop {
@@ -654,10 +659,28 @@
 		entry = list_entry (list, struct message_entry, list);
 
 		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_message_list (%s) (%llu)\n",
-			    (char *)(entry->message.data), (unsigned long long)(entry->time));
+			    (char *)(entry->message.data),
+			    (unsigned long long)(entry->time));
 	}
 }
 
+static void print_queue_group_list (struct queue_group *group)
+{
+	struct list_head *list;
+	struct queue_group_entry *entry;
+
+	for (list = group->message_queue_head.next;
+	     list != &group->message_queue_head;
+	     list = list->next)
+	{
+		entry = list_entry (list, struct queue_group_entry, list);
+
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: print_queue_group_list (%s) (%u)\n",
+			    (char *)(entry->message_queue->name.value),
+			    (unsigned int)(entry->change));
+	}
+}
+
 static struct message_queue *queue_find (SaNameT *name)
 {
 	struct list_head *list;
@@ -679,41 +702,130 @@
 static struct queue_group *queue_group_find (SaNameT *name)
 {
 	struct list_head *list;
-	struct queue_group *queue_group;
+	struct queue_group *group;
 
 	for (list = queue_group_list_head.next;
 	     list != &queue_group_list_head;
 	     list = list->next)
 	{
-	        queue_group = list_entry (list, struct queue_group, list);
+	        group = list_entry (list, struct queue_group, list);
 
-		if (name_match (name, &queue_group->name)) {
-			return (queue_group);
+		if (name_match (name, &group->name)) {
+			return (group);
 		}
 	}
 	return (0);
 }
 
-static struct queue_group_entry *queue_group_entry_find (
-	struct queue_group *queue_group,
-	struct message_queue *queue)
+static struct queue_group_entry *queue_group_entry_find (struct queue_group *group, struct message_queue *queue)
 {
 	struct list_head *list;
-	struct queue_group_entry *queue_group_entry;
+	struct queue_group_entry *entry;
 
-	for (list = queue_group->message_queue_head.next;
-	     list != &queue_group->message_queue_head;
+	for (list = group->message_queue_head.next;
+	     list != &group->message_queue_head;
 	     list = list->next)
 	{
-	        queue_group_entry = list_entry (list, struct queue_group_entry, list);
-	
-		if (queue_group_entry->message_queue == queue) {
-			return (queue_group_entry);
+	        entry = list_entry (list, struct queue_group_entry, list);
+
+		if (entry->message_queue == queue) {
+			return (entry);
 		}
 	}
 	return (0);
 }
 
+static unsigned int queue_group_member_count (struct queue_group *group)
+{
+	struct list_head *list;
+
+	unsigned int count = 0;
+
+	for (list = group->message_queue_head.next;
+	     list != &group->message_queue_head;
+	     list = list->next)
+	{
+		count++;
+	}
+	return (count);
+}
+
+static unsigned int queue_group_change_count (struct queue_group *group)
+{
+	struct list_head *list;
+	struct queue_group_entry *entry;
+
+	unsigned int count = 0;
+
+	for (list = group->message_queue_head.next;
+	     list != &group->message_queue_head;
+	     list = list->next)
+	{
+		entry = list_entry (list, struct queue_group_entry, list);
+
+		if (entry->change != SA_MSG_QUEUE_GROUP_NO_CHANGE) {
+			count++;
+		}
+	}
+	return (count);
+}
+
+static unsigned int queue_group_track (
+	struct queue_group *group,
+	unsigned int flags,
+	void *buffer)
+{
+	struct list_head *list;
+	struct queue_group_entry *entry;
+
+	unsigned int i = 0;
+
+	SaMsgQueueGroupNotificationT *notification =
+		(SaMsgQueueGroupNotificationT *) buffer;
+
+
+	switch (flags) {
+
+	case SA_TRACK_CURRENT:
+	case SA_TRACK_CHANGES:
+
+		for (list = group->message_queue_head.next;
+		     list != &group->message_queue_head;
+		     list = list->next)
+		{
+			entry = list_entry (list, struct queue_group_entry, list);
+			memcpy (&notification[i].member.queueName,
+				&entry->message_queue->name,
+				sizeof (SaNameT));
+			notification[i].change = entry->change;
+			i++;
+		}
+		break;
+
+	case SA_TRACK_CHANGES_ONLY:
+
+		for (list = group->message_queue_head.next;
+		     list != &group->message_queue_head;
+		     list = list->next)
+		{
+			entry = list_entry (list, struct queue_group_entry, list);
+			if (entry->change != SA_MSG_QUEUE_GROUP_NO_CHANGE) {
+				memcpy (&notification[i].member.queueName,
+					&entry->message_queue->name,
+					sizeof (SaNameT));
+				notification[i].change = entry->change;
+				i++;
+			}
+		}
+		break;
+
+	default:
+		break;
+	}
+
+	return (i);
+}
+
 static int msg_exec_init_fn (struct objdb_iface_ver0 *objdb)
 {
 	/*
@@ -1016,11 +1128,17 @@
 	struct req_exec_msg_queuegroupinsert *req_exec_msg_queuegroupinsert =
 		(struct req_exec_msg_queuegroupinsert *)message;
 	struct res_lib_msg_queuegroupinsert res_lib_msg_queuegroupinsert;
+	struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
 	struct message_queue *queue;
 	struct queue_group *queue_group;
 	struct queue_group_entry *queue_group_entry;
+	SaMsgQueueGroupNotificationT *notification;
 	SaAisErrorT error = SA_AIS_OK;
+	SaAisErrorT error_cb = SA_AIS_OK;
 
+	unsigned int change_count = 0;
+	unsigned int member_count = 0;
+
 	queue_group = queue_group_find (&req_exec_msg_queuegroupinsert->queue_group_name);
 
 	if (queue_group == 0) {
@@ -1043,8 +1161,51 @@
 	list_init (&queue_group_entry->list);
 	list_add (&queue_group_entry->list, &queue_group->message_queue_head);
 	list_add (&queue->list, &queue_list_head);
+
 	queue_group_entry->message_queue = queue;
+	queue_group_entry->change = SA_MSG_QUEUE_GROUP_ADDED;
 
+	if (queue_group->track_flags & SA_TRACK_CHANGES) {
+		member_count = queue_group_member_count (queue_group);
+		change_count = queue_group_change_count (queue_group);
+
+		notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * member_count);
+
+		if (notification == NULL) {
+			error_cb = SA_AIS_ERR_NO_MEMORY;
+			goto error_track;
+		}
+
+		memset (notification, 0, sizeof (SaMsgQueueGroupNotificationT) * member_count);
+
+		res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems =
+			queue_group_track (queue_group,
+					   SA_TRACK_CHANGES,
+					   (void *)(notification));
+	}
+
+	if (queue_group->track_flags & SA_TRACK_CHANGES_ONLY) {
+		member_count = queue_group_member_count (queue_group);
+		change_count = queue_group_change_count (queue_group);
+
+		notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * change_count);
+
+		if (notification == NULL) {
+			error_cb = SA_AIS_ERR_NO_MEMORY;
+			goto error_track;
+		}
+
+		memset (notification, 0, sizeof (SaMsgQueueGroupNotificationT) * change_count);
+
+		res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems =
+			queue_group_track (queue_group,
+					   SA_TRACK_CHANGES_ONLY,
+					   (void *)(notification));
+	}
+
+error_track:
+	queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;
+
 error_exit:
 	if (message_source_is_local(&req_exec_msg_queuegroupinsert->source)) {
 		res_lib_msg_queuegroupinsert.header.size =
@@ -1057,6 +1218,38 @@
 			req_exec_msg_queuegroupinsert->source.conn,
 			&res_lib_msg_queuegroupinsert,
 			sizeof (struct res_lib_msg_queuegroupinsert));
+
+		/*
+		 * Track changes (callback) if tracking is enabled
+		 */
+
+		if ((queue_group->track_flags & SA_TRACK_CHANGES) ||
+		    (queue_group->track_flags & SA_TRACK_CHANGES_ONLY))
+		{
+			res_lib_msg_queuegrouptrack.header.size =
+				(sizeof (struct res_lib_msg_queuegrouptrack) +
+				 (sizeof (SaMsgQueueGroupNotificationT) *
+				  res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+			res_lib_msg_queuegrouptrack.header.id =
+				MESSAGE_RES_MSG_QUEUEGROUPTRACK;
+			res_lib_msg_queuegrouptrack.header.error = error_cb;
+			res_lib_msg_queuegrouptrack.numberOfMembers = member_count;
+
+			memcpy (&res_lib_msg_queuegrouptrack.queueGroupName,
+				&req_exec_msg_queuegroupinsert->queue_group_name,
+				sizeof (SaNameT));
+
+			openais_conn_send_response (
+				openais_conn_partner_get (req_exec_msg_queuegroupinsert->source.conn),
+				&res_lib_msg_queuegrouptrack,
+				sizeof (struct res_lib_msg_queuegrouptrack));
+
+			openais_conn_send_response (
+				openais_conn_partner_get (req_exec_msg_queuegroupinsert->source.conn),
+				notification,
+				(sizeof (SaMsgQueueGroupNotificationT) *
+				 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+		}
 	}
 }
 
@@ -1067,11 +1260,17 @@
 	struct req_exec_msg_queuegroupremove *req_exec_msg_queuegroupremove =
 		(struct req_exec_msg_queuegroupremove *)message;
 	struct res_lib_msg_queuegroupremove res_lib_msg_queuegroupremove;
+	struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
 	struct queue_group *queue_group;
 	struct message_queue *queue;
 	struct queue_group_entry *queue_group_entry;
+	SaMsgQueueGroupNotificationT *notification;
 	SaAisErrorT error = SA_AIS_OK;
+	SaAisErrorT error_cb = SA_AIS_OK;
 
+	unsigned int change_count = 0;
+	unsigned int member_count = 0;
+
 	queue_group = queue_group_find (&req_exec_msg_queuegroupremove->queue_group_name);
 	if (queue_group == 0) {
 		error = SA_AIS_ERR_NOT_EXIST;
@@ -1090,6 +1289,49 @@
 		goto error_exit;
 	}
 
+	queue_group_entry->change = SA_MSG_QUEUE_GROUP_REMOVED;
+
+	if (queue_group->track_flags & SA_TRACK_CHANGES) {
+		member_count = queue_group_member_count (queue_group);
+		change_count = queue_group_change_count (queue_group);
+
+		notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * member_count);
+
+		if (notification == NULL) {
+			error_cb = SA_AIS_ERR_NO_MEMORY;
+			goto error_track;
+		}
+
+		memset (notification, 0, (sizeof (SaMsgQueueGroupNotificationT) * member_count));
+
+		res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems =
+			queue_group_track (queue_group,
+					   SA_TRACK_CHANGES,
+					   (void *)(notification));
+	}
+
+	if (queue_group->track_flags & SA_TRACK_CHANGES_ONLY) {
+		member_count = queue_group_member_count (queue_group);
+		change_count = queue_group_change_count (queue_group);
+
+		notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * change_count);
+
+		if (notification == NULL) {
+			error_cb = SA_AIS_ERR_NO_MEMORY;
+			goto error_track;
+		}
+
+		memset (notification, 0, (sizeof (SaMsgQueueGroupNotificationT) * change_count));
+
+		res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems =
+			queue_group_track (queue_group,
+					   SA_TRACK_CHANGES_ONLY,
+					   (void *)(notification));
+	}
+
+error_track:
+	queue_group_entry->change = SA_MSG_QUEUE_GROUP_NO_CHANGE;
+
 	list_del (&queue_group_entry->list);
 
 error_exit:
@@ -1104,6 +1346,38 @@
 			req_exec_msg_queuegroupremove->source.conn,
 			&res_lib_msg_queuegroupremove,
 			sizeof (struct res_lib_msg_queuegroupremove));
+
+		/*
+		 * Track changes (callback) if tracking is enabled
+		 */
+
+		if ((queue_group->track_flags & SA_TRACK_CHANGES) ||
+		    (queue_group->track_flags & SA_TRACK_CHANGES_ONLY))
+		{
+			res_lib_msg_queuegrouptrack.header.size =
+				(sizeof (struct res_lib_msg_queuegrouptrack) +
+				 (sizeof (SaMsgQueueGroupNotificationT) *
+				  res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+			res_lib_msg_queuegrouptrack.header.id =
+				MESSAGE_RES_MSG_QUEUEGROUPTRACK;
+			res_lib_msg_queuegrouptrack.header.error = error_cb;
+			res_lib_msg_queuegrouptrack.numberOfMembers = member_count;
+
+			memcpy (&res_lib_msg_queuegrouptrack.queueGroupName,
+				&req_exec_msg_queuegroupremove->queue_group_name,
+				sizeof (SaNameT));
+
+			openais_conn_send_response (
+				openais_conn_partner_get (req_exec_msg_queuegroupremove->source.conn),
+				&res_lib_msg_queuegrouptrack,
+				sizeof (struct res_lib_msg_queuegrouptrack));
+
+			openais_conn_send_response (
+				openais_conn_partner_get (req_exec_msg_queuegroupremove->source.conn),
+				notification,
+				(sizeof (SaMsgQueueGroupNotificationT) *
+				 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+		}
 	}
 }
 
@@ -1144,22 +1418,153 @@
 	void *message,
 	unsigned int nodeid)
 {
-#if 0
 	struct req_exec_msg_queuegrouptrack *req_exec_msg_queuegrouptrack =
 		(struct req_exec_msg_queuegrouptrack *)message;
 	struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
-#endif
+	struct queue_group *queue_group;
+	SaAisErrorT error = SA_AIS_OK;
+
+	unsigned int change_count = 0;
+	unsigned int member_count = 0;
+
+	SaMsgQueueGroupNotificationT *notification;
+
+	queue_group = queue_group_find (&req_exec_msg_queuegrouptrack->queue_group_name);
+
+	if (queue_group == 0) {
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	member_count = queue_group_member_count (queue_group);
+	change_count = queue_group_change_count (queue_group);
+
+	if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CURRENT) {
+		/* DEBUG */
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CURRENT\n");
+
+		notification = malloc (sizeof (SaMsgQueueGroupNotificationT) * member_count);
+
+		if (notification == NULL) {
+			error = SA_AIS_ERR_NO_MEMORY;
+			goto error_exit;
+		}
+
+		memset (notification, 0, sizeof (SaMsgQueueGroupNotificationT) * member_count);
+
+		res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems =
+			queue_group_track (queue_group, SA_TRACK_CURRENT, (void *)(notification));
+	}
+
+	if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CHANGES) {
+		/* DEBUG */
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CHANGES\n");
+		queue_group->track_flags = req_exec_msg_queuegrouptrack->track_flags;
+	}
+
+	if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CHANGES_ONLY) {
+		/* DEBUG */
+		log_printf (LOG_LEVEL_NOTICE, "[DEBUG]: SA_TRACK_CHANGES_ONLY\n");
+		queue_group->track_flags = req_exec_msg_queuegrouptrack->track_flags;
+	}
+
+error_exit:
+	if (message_source_is_local(&req_exec_msg_queuegrouptrack->source)) {
+		res_lib_msg_queuegrouptrack.header.size =
+			sizeof (struct res_lib_msg_queuegrouptrack);
+		res_lib_msg_queuegrouptrack.header.id =
+			MESSAGE_RES_MSG_QUEUEGROUPTRACK;
+		res_lib_msg_queuegrouptrack.header.error = error;
+		res_lib_msg_queuegrouptrack.numberOfMembers = member_count;
+
+		memcpy (&res_lib_msg_queuegrouptrack.queueGroupName,
+			&req_exec_msg_queuegrouptrack->queue_group_name,
+			sizeof (SaNameT));
+
+		if (req_exec_msg_queuegrouptrack->track_flags & SA_TRACK_CURRENT) {
+			if (req_exec_msg_queuegrouptrack->buffer_flag) {
+				res_lib_msg_queuegrouptrack.header.size +=
+					(sizeof (SaMsgQueueGroupNotificationT) *
+					 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems);
+
+				openais_conn_send_response (
+					req_exec_msg_queuegrouptrack->source.conn,
+					&res_lib_msg_queuegrouptrack,
+					sizeof (struct res_lib_msg_queuegrouptrack));
+
+				openais_conn_send_response (
+					req_exec_msg_queuegrouptrack->source.conn,
+					notification,
+					(sizeof (SaMsgQueueGroupNotificationT) *
+					 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+			} else {
+				openais_conn_send_response (
+					req_exec_msg_queuegrouptrack->source.conn,
+					&res_lib_msg_queuegrouptrack,
+					sizeof (struct res_lib_msg_queuegrouptrack));
+
+				res_lib_msg_queuegrouptrack.header.size +=
+					(sizeof (SaMsgQueueGroupNotificationT) *
+					 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems);
+
+				openais_conn_send_response (
+					openais_conn_partner_get (req_exec_msg_queuegrouptrack->source.conn),
+					&res_lib_msg_queuegrouptrack,
+					sizeof (struct res_lib_msg_queuegrouptrack));
+
+				openais_conn_send_response (
+					openais_conn_partner_get (req_exec_msg_queuegrouptrack->source.conn),
+					notification,
+					(sizeof (SaMsgQueueGroupNotificationT) *
+					 res_lib_msg_queuegrouptrack.notificationBuffer.numberOfItems));
+			}
+		} else {
+			openais_conn_send_response (
+				req_exec_msg_queuegrouptrack->source.conn,
+				&res_lib_msg_queuegrouptrack,
+				sizeof (struct res_lib_msg_queuegrouptrack));
+		}
+	}
 }
 
 static void message_handler_req_exec_msg_queuegrouptrackstop (
 	void *message,
 	unsigned int nodeid)
 {
-#if 0
 	struct req_exec_msg_queuegrouptrackstop *req_exec_msg_queuegrouptrackstop =
 		(struct req_exec_msg_queuegrouptrackstop *)message;
 	struct res_lib_msg_queuegrouptrackstop res_lib_msg_queuegrouptrackstop;
-#endif
+	struct queue_group *queue_group;
+	SaAisErrorT error = SA_AIS_OK;
+
+	queue_group = queue_group_find (&req_exec_msg_queuegrouptrackstop->queue_group_name);
+
+	if (queue_group == 0) {
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	if ((queue_group->track_flags != SA_TRACK_CHANGES) &&
+	    (queue_group->track_flags != SA_TRACK_CHANGES_ONLY)) {
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	queue_group->track_flags = 0;
+
+error_exit:
+	if (message_source_is_local(&req_exec_msg_queuegrouptrackstop->source)) {
+		res_lib_msg_queuegrouptrackstop.header.size =
+			sizeof (struct res_lib_msg_queuegrouptrackstop);
+		res_lib_msg_queuegrouptrackstop.header.id =
+			MESSAGE_RES_MSG_QUEUEGROUPTRACKSTOP;
+		res_lib_msg_queuegrouptrackstop.header.error = error;
+
+		openais_conn_send_response (
+			req_exec_msg_queuegrouptrackstop->source.conn,
+			&res_lib_msg_queuegrouptrackstop,
+			sizeof (struct res_lib_msg_queuegrouptrackstop));
+	}
 }
 
 static void message_handler_req_exec_msg_messagesend (
@@ -1209,9 +1614,6 @@
 
 	list_add_tail (&entry->list, &queue->message_list_head);
 
-	/* DEBUG */
-	print_message_list (queue);
-
 error_exit:
 
 	if (message_source_is_local(&req_exec_msg_messagesend->source)) {
@@ -1536,6 +1938,9 @@
 	memcpy (&req_exec_msg_queuegroupcreate.queue_group_name,
 		&req_lib_msg_queuegroupcreate->queueGroupName, sizeof (SaNameT));
 
+	req_exec_msg_queuegroupcreate.policy =
+		req_lib_msg_queuegroupcreate->queueGroupPolicy;
+
 	iovec.iov_base = (char *)&req_exec_msg_queuegroupcreate;
 	iovec.iov_len = sizeof (req_exec_msg_queuegroupcreate);
 
@@ -1656,6 +2061,11 @@
 	memcpy (&req_exec_msg_queuegrouptrack.queue_group_name,
 		&req_lib_msg_queuegrouptrack->queueGroupName, sizeof (SaNameT));
 
+	req_exec_msg_queuegrouptrack.track_flags =
+		req_lib_msg_queuegrouptrack->trackFlags;
+	req_exec_msg_queuegrouptrack.buffer_flag =
+		req_lib_msg_queuegrouptrack->bufferFlag;
+
 	iovec.iov_base = (char *)&req_exec_msg_queuegrouptrack;
 	iovec.iov_len = sizeof (req_exec_msg_queuegrouptrack);
 
Index: lib/msg.c
===================================================================
--- lib/msg.c	(revision 1579)
+++ lib/msg.c	(working copy)
@@ -301,8 +301,8 @@
 
 	struct res_lib_msg_queueopenasync *res_lib_msg_queueopenasync;
 	struct res_lib_msg_messagesendasync *res_lib_msg_messagesendasync;
+	struct res_lib_msg_queuegrouptrack *res_lib_msg_queuegrouptrack;
 
-
 	if (dispatchFlags != SA_DISPATCH_ONE &&
 	    dispatchFlags != SA_DISPATCH_ALL &&
 	    dispatchFlags != SA_DISPATCH_BLOCKING)
@@ -354,6 +354,7 @@
 			pthread_mutex_unlock(&msgInstance->dispatch_mutex);
 			break; /* exit do while cont is 1 loop */
 		} else
+
 		if (dispatch_avail == 0) {
 			pthread_mutex_unlock(&msgInstance->dispatch_mutex);
 			continue;
@@ -366,6 +367,7 @@
 		if (error != SA_AIS_OK) {
 			goto error_unlock;
 		}
+
 		if (dispatch_data.header.size > sizeof (mar_res_header_t)) {
 			error = saRecvRetry (msgInstance->dispatch_fd, &dispatch_data.data,
 				dispatch_data.header.size - sizeof (mar_res_header_t));
@@ -453,6 +455,26 @@
 
 			break;
 
+		case MESSAGE_RES_MSG_QUEUEGROUPTRACK:
+
+			if (callbacks.saMsgQueueGroupTrackCallback == NULL) {
+				continue;
+			}
+			res_lib_msg_queuegrouptrack =
+				(struct res_lib_msg_queuegrouptrack *) &dispatch_data;
+
+			res_lib_msg_queuegrouptrack->notificationBuffer.notification =
+				(SaMsgQueueGroupNotificationT *)
+				(((char *) &dispatch_data) + sizeof (struct res_lib_msg_queuegrouptrack));
+
+			callbacks.saMsgQueueGroupTrackCallback (
+				&res_lib_msg_queuegrouptrack->queueGroupName,
+				&res_lib_msg_queuegrouptrack->notificationBuffer,
+				res_lib_msg_queuegrouptrack->numberOfMembers,
+				res_lib_msg_queuegrouptrack->header.error);
+
+			break;
+
 		default:
 			/* TODO */
 			break;
@@ -1117,6 +1139,22 @@
 		return (SA_AIS_ERR_INVALID_PARAM);
 	}
 
+	if ((notificationBuffer != NULL) &&
+	    (notificationBuffer->notification != NULL) &&
+	    (notificationBuffer->numberOfItems == 0)) {
+		return (SA_AIS_ERR_INVALID_PARAM);
+	}
+
+	if ((notificationBuffer != NULL) &&
+	    (notificationBuffer->notification == NULL)) {
+		notificationBuffer->numberOfItems = 0;
+	}
+
+	if ((trackFlags & SA_TRACK_CHANGES) &&
+	    (trackFlags & SA_TRACK_CHANGES_ONLY)) {
+		return (SA_AIS_ERR_BAD_FLAGS);
+	}
+
 	/* DEBUG */
 	printf ("[DEBUG]: saMsgQueueGroupTrack { queueGroupName = %s }\n",
 		(char *) queueGroupName->value);
@@ -1133,20 +1171,67 @@
 		MESSAGE_REQ_MSG_QUEUEGROUPTRACK;
 
 	req_lib_msg_queuegrouptrack.trackFlags = trackFlags;
+	req_lib_msg_queuegrouptrack.bufferFlag = (notificationBuffer != NULL);
 
+	/* DEBUG */
+	printf ("[DEBUG]: saMsgQueueGroupTrack { bufferFlag = %d }\n",
+		(int)(req_lib_msg_queuegrouptrack.bufferFlag));
+
 	memcpy (&req_lib_msg_queuegrouptrack.queueGroupName, queueGroupName,
 		sizeof (SaNameT));
 
 	pthread_mutex_lock (&msgInstance->response_mutex);
 
+	/*
 	error = saSendReceiveReply (msgInstance->response_fd,
 		&req_lib_msg_queuegrouptrack,
 		sizeof (struct req_lib_msg_queuegrouptrack),
 		&res_lib_msg_queuegrouptrack,
 		sizeof (struct res_lib_msg_queuegrouptrack));
+	*/
 
+	error = saSendRetry (msgInstance->response_fd, &req_lib_msg_queuegrouptrack,
+		sizeof (struct req_lib_msg_queuegrouptrack));
+	if (error != SA_AIS_OK) {
+		goto error_exit;
+	}
+
+	error = saRecvRetry (msgInstance->response_fd, &res_lib_msg_queuegrouptrack,
+		sizeof (struct res_lib_msg_queuegrouptrack));
+	if (error != SA_AIS_OK) {
+		goto error_exit;
+	}
+
+	if ((trackFlags & SA_TRACK_CURRENT) && (notificationBuffer != NULL)) {
+		if (notificationBuffer->notification != NULL) {
+			if (notificationBuffer->numberOfItems < res_lib_msg_queuegrouptrack.numberOfMembers) {
+				error = SA_AIS_ERR_NO_SPACE;
+				goto error_exit;
+			}
+		} else {
+			notificationBuffer->notification =
+				malloc (sizeof (SaMsgQueueGroupNotificationT) *
+					res_lib_msg_queuegrouptrack.numberOfMembers);
+
+			if (notificationBuffer->notification == NULL) {
+				error = SA_AIS_ERR_NO_MEMORY;
+				goto error_exit;
+			}
+
+			memset (notificationBuffer->notification, 0,
+				(sizeof (SaMsgQueueGroupNotificationT) *
+				 res_lib_msg_queuegrouptrack.numberOfMembers));
+		}
+
+		error = saRecvRetry (msgInstance->response_fd,
+				     notificationBuffer->notification,
+				     (sizeof (SaMsgQueueGroupNotificationT) *
+				      res_lib_msg_queuegrouptrack.numberOfMembers));
+	}
+
+error_exit:
 	pthread_mutex_unlock (&msgInstance->response_mutex);
-
+error_put_msg:
 	saHandleInstancePut (&msgHandleDatabase, msgHandle);
 
 	return (error == SA_AIS_OK ? res_lib_msg_queuegrouptrack.header.error : error);


More information about the Openais mailing list