[Openais] Re: recent segfault

Mark Haverkamp markh at osdl.org
Wed Feb 2 08:37:22 PST 2005


On Tue, 2005-02-01 at 14:51 -0700, Steven Dake wrote:
[...] 
> 
> good idea Mark.  The patch should be pretty easy to develop.  I'm
> looking at the sort queue in use bug now.  If you want to work up a
> patch for the continuation bit idea that would be cool.

Here is a patch.  It seem to be running OK, but its hard to cause the
condition that this fixes.  So take a close look.  Also, I noticed that
the re-assembly loops are essentially duplicate code.  Would you mind if
I look at collapsing those loops into one?

Mark.


===== exec/totempg.c 1.6 vs edited =====
--- 1.6/exec/totempg.c	2005-02-01 12:39:32 -08:00
+++ edited/exec/totempg.c	2005-02-02 08:30:12 -08:00
@@ -99,9 +99,21 @@
 	short type;
 };
 
+/*
+ * totempg_mcast structure
+ *
+ * header:				Identify the mcast.
+ * fragmented:			Set if this message continues into next message
+ * continuation:		Set if this message is a continuation from last message
+ * msg_count			Indicates how many packed messages are contained
+ * 						in the mcast.
+ * Also, the size of each packed message and the messages themselves are
+ * appended to the end of this structure when sent.
+ */
 struct totempg_mcast {
 	struct totempg_mcast_header header;
-	short fragmented; /* This message continues into next message */
+	unsigned char fragmented; 
+	unsigned char continuation; 
 	short msg_count;
 	/*
 	 * short msg_len[msg_count];
@@ -114,7 +126,8 @@
 /*
  * Maximum packet size for totem pg messages
  */
-#define TOTEMPG_PACKET_SIZE (TOTEMSRP_PACKET_SIZE_MAX - sizeof (struct totempg_mcast))
+#define TOTEMPG_PACKET_SIZE (TOTEMSRP_PACKET_SIZE_MAX - \
+				sizeof (struct totempg_mcast))
 
 /*
  * Local variables used for packing small messages
@@ -147,9 +160,19 @@
 struct assembly *assembly_list[16]; // MAX PROCESSORS TODO
 int assembly_list_entries = 0;
 
+/*
+ * Staging buffer for packed messages.  Messages are staged in this buffer
+ * before sending.  Multiple messages may fit which cuts down on the 
+ * number of mcasts sent.  If a message doesn't completely fit, then 
+ * the mcast header has a fragment bit set that says that there are more 
+ * data to follow.  fragment_size is an index into the buffer.  It indicates
+ * the size of message data and where to place new message data.  
+ * fragment_contuation indicates whether the first packed message in 
+ * the buffer is a continuation of a previously packed fragment.
+ */
 static unsigned char fragmentation_data[TOTEMPG_PACKET_SIZE];
-
 int fragment_size = 0;
+int fragment_continuation = 0;
 
 static struct iovec iov_delv;
 
@@ -238,11 +261,14 @@
 	assert (assembly);
 
 	/*
-	 * Assemble the header into one block of data
-	 * Assemble the packet contents into one block of data to simplify delivery
+	 * Assemble the header into one block of data and
+	 * assemble the packet contents into one block of data to simplify delivery
 	 */
 	if (iov_len == 1) {
-		/* message originated from external processor - 1 iovec for full msg */
+		/* 
+		 * This message originated from external processor 
+		 * because there is only one iovec for the full msg.
+		 */
 		char *data;
 		int datasize;
 
@@ -263,7 +289,10 @@
 		memcpy (&assembly->data[assembly->index], &data[datasize],
 			iovec[0].iov_len - datasize);
 	} else {
-		/* message originated from local processor  - <1 iovec for full msg */
+		/* 
+		 * The message originated from local processor  
+		 * becasue there is greater than one iovec for then full msg.
+		 */
 		h_index = 0;
 		for (i = 0; i < 2; i++) {
 			memcpy (&header[h_index], iovec[i].iov_base, iovec[i].iov_len);
@@ -284,7 +313,6 @@
 	}
 
 	if (endian_conversion_required) {
-		mcast->fragmented = swab16 (mcast->fragmented);
 		mcast->msg_count = swab16 (mcast->msg_count);
 		for (i = 0; i < mcast->msg_count; i++) {
 			msg_lens[i] = swab16 (msg_lens[i]);
@@ -309,8 +337,19 @@
 		iov_delv.iov_base = &assembly->data[0];
 		iov_delv.iov_len = assembly->index + msg_lens[0];
 		for  (i = 0; i < mcast->msg_count; i++) {
-			assembly->index += msg_lens[i];
 //printf ("app deliver\n");
+			/*
+			 * If the first packed message is a continuation
+			 * of a previous message, but the assembly buffer
+			 * is empty, then we need to discard it since we can't
+			 * assemble a complete message.
+			 */
+			if (mcast->continuation && (assembly->index == 0)) {
+				assembly->index += msg_lens[i];
+				mcast->continuation = 0;
+				continue;
+			}
+			assembly->index += msg_lens[i];
 			app_deliver_fn (source_addr, &iov_delv, 1,
 				endian_conversion_required);
 			iov_delv.iov_base = &assembly->data[assembly->index];
@@ -326,6 +365,17 @@
 		iov_delv.iov_base = &assembly->data[0];
 		iov_delv.iov_len = assembly->index + msg_lens[0];
 		for  (i = 0; i < mcast->msg_count - 1; i++) {
+			/*
+			 * If the first packed message is a continuation
+			 * of a previous message, but the assembly buffer
+			 * is empty, then we need to discard it since we can't
+			 * assemble a complete message.
+			 */
+			if (mcast->continuation && (assembly->index == 0)) {
+				assembly->index += msg_lens[i];
+				mcast->continuation = 0;
+				continue;
+			}
 			assembly->index += msg_lens[i];
 //printf ("app deliver\n");
 			app_deliver_fn (source_addr, &iov_delv, 1,
@@ -381,6 +431,14 @@
 		return (0);
 	}
 	mcast.fragmented = 0;
+
+	/*
+	 * Was the first message in this buffer a continuation of a
+	 * fragmented message?
+	 */
+	mcast.continuation = fragment_continuation;
+	fragment_continuation = 0;
+
 	mcast.msg_count = mcast_packed_msg_count;
 
 	iovecs[0].iov_base = &mcast;
@@ -468,6 +526,7 @@
 
 	for (i = 0; i < iov_len; ) {
 		mcast.fragmented = 0;
+		mcast.continuation = fragment_continuation;
 		copy_len = iovec[i].iov_len - copy_base;
 
 		/*
@@ -498,11 +557,15 @@
 
 			/*
 			 * if we're not on the last iovec or the iovec is too large to
-			 * fit, then indicate a fragment.
+			 * fit, then indicate a fragment. This also means that the next
+			 * message will have the continuation of this one.
 			 */
 			if ((i < (iov_len - 1)) || 
 					((copy_base + copy_len) < iovec[i].iov_len)) {
 				mcast.fragmented = 1;
+				fragment_continuation = 1;
+			} else {
+				fragment_continuation = 0;
 			}
 
 			/*

-- 
Mark Haverkamp <markh at osdl.org>




More information about the Openais mailing list