[Openais] Re: recent segfault
Mark Haverkamp
markh at osdl.org
Wed Feb 2 08:37:22 PST 2005
On Tue, 2005-02-01 at 14:51 -0700, Steven Dake wrote:
[...]
>
> good idea Mark. The patch should be pretty easy to develop. I'm
> looking at the sort queue in use bug now. If you want to work up a
> patch for the continuation bit idea that would be cool.
Here is a patch. It seem to be running OK, but its hard to cause the
condition that this fixes. So take a close look. Also, I noticed that
the re-assembly loops are essentially duplicate code. Would you mind if
I look at collapsing those loops into one?
Mark.
===== exec/totempg.c 1.6 vs edited =====
--- 1.6/exec/totempg.c 2005-02-01 12:39:32 -08:00
+++ edited/exec/totempg.c 2005-02-02 08:30:12 -08:00
@@ -99,9 +99,21 @@
short type;
};
+/*
+ * totempg_mcast structure
+ *
+ * header: Identify the mcast.
+ * fragmented: Set if this message continues into next message
+ * continuation: Set if this message is a continuation from last message
+ * msg_count Indicates how many packed messages are contained
+ * in the mcast.
+ * Also, the size of each packed message and the messages themselves are
+ * appended to the end of this structure when sent.
+ */
struct totempg_mcast {
struct totempg_mcast_header header;
- short fragmented; /* This message continues into next message */
+ unsigned char fragmented;
+ unsigned char continuation;
short msg_count;
/*
* short msg_len[msg_count];
@@ -114,7 +126,8 @@
/*
* Maximum packet size for totem pg messages
*/
-#define TOTEMPG_PACKET_SIZE (TOTEMSRP_PACKET_SIZE_MAX - sizeof (struct totempg_mcast))
+#define TOTEMPG_PACKET_SIZE (TOTEMSRP_PACKET_SIZE_MAX - \
+ sizeof (struct totempg_mcast))
/*
* Local variables used for packing small messages
@@ -147,9 +160,19 @@
struct assembly *assembly_list[16]; // MAX PROCESSORS TODO
int assembly_list_entries = 0;
+/*
+ * Staging buffer for packed messages. Messages are staged in this buffer
+ * before sending. Multiple messages may fit which cuts down on the
+ * number of mcasts sent. If a message doesn't completely fit, then
+ * the mcast header has a fragment bit set that says that there are more
+ * data to follow. fragment_size is an index into the buffer. It indicates
+ * the size of message data and where to place new message data.
+ * fragment_contuation indicates whether the first packed message in
+ * the buffer is a continuation of a previously packed fragment.
+ */
static unsigned char fragmentation_data[TOTEMPG_PACKET_SIZE];
-
int fragment_size = 0;
+int fragment_continuation = 0;
static struct iovec iov_delv;
@@ -238,11 +261,14 @@
assert (assembly);
/*
- * Assemble the header into one block of data
- * Assemble the packet contents into one block of data to simplify delivery
+ * Assemble the header into one block of data and
+ * assemble the packet contents into one block of data to simplify delivery
*/
if (iov_len == 1) {
- /* message originated from external processor - 1 iovec for full msg */
+ /*
+ * This message originated from external processor
+ * because there is only one iovec for the full msg.
+ */
char *data;
int datasize;
@@ -263,7 +289,10 @@
memcpy (&assembly->data[assembly->index], &data[datasize],
iovec[0].iov_len - datasize);
} else {
- /* message originated from local processor - <1 iovec for full msg */
+ /*
+ * The message originated from local processor
+ * becasue there is greater than one iovec for then full msg.
+ */
h_index = 0;
for (i = 0; i < 2; i++) {
memcpy (&header[h_index], iovec[i].iov_base, iovec[i].iov_len);
@@ -284,7 +313,6 @@
}
if (endian_conversion_required) {
- mcast->fragmented = swab16 (mcast->fragmented);
mcast->msg_count = swab16 (mcast->msg_count);
for (i = 0; i < mcast->msg_count; i++) {
msg_lens[i] = swab16 (msg_lens[i]);
@@ -309,8 +337,19 @@
iov_delv.iov_base = &assembly->data[0];
iov_delv.iov_len = assembly->index + msg_lens[0];
for (i = 0; i < mcast->msg_count; i++) {
- assembly->index += msg_lens[i];
//printf ("app deliver\n");
+ /*
+ * If the first packed message is a continuation
+ * of a previous message, but the assembly buffer
+ * is empty, then we need to discard it since we can't
+ * assemble a complete message.
+ */
+ if (mcast->continuation && (assembly->index == 0)) {
+ assembly->index += msg_lens[i];
+ mcast->continuation = 0;
+ continue;
+ }
+ assembly->index += msg_lens[i];
app_deliver_fn (source_addr, &iov_delv, 1,
endian_conversion_required);
iov_delv.iov_base = &assembly->data[assembly->index];
@@ -326,6 +365,17 @@
iov_delv.iov_base = &assembly->data[0];
iov_delv.iov_len = assembly->index + msg_lens[0];
for (i = 0; i < mcast->msg_count - 1; i++) {
+ /*
+ * If the first packed message is a continuation
+ * of a previous message, but the assembly buffer
+ * is empty, then we need to discard it since we can't
+ * assemble a complete message.
+ */
+ if (mcast->continuation && (assembly->index == 0)) {
+ assembly->index += msg_lens[i];
+ mcast->continuation = 0;
+ continue;
+ }
assembly->index += msg_lens[i];
//printf ("app deliver\n");
app_deliver_fn (source_addr, &iov_delv, 1,
@@ -381,6 +431,14 @@
return (0);
}
mcast.fragmented = 0;
+
+ /*
+ * Was the first message in this buffer a continuation of a
+ * fragmented message?
+ */
+ mcast.continuation = fragment_continuation;
+ fragment_continuation = 0;
+
mcast.msg_count = mcast_packed_msg_count;
iovecs[0].iov_base = &mcast;
@@ -468,6 +526,7 @@
for (i = 0; i < iov_len; ) {
mcast.fragmented = 0;
+ mcast.continuation = fragment_continuation;
copy_len = iovec[i].iov_len - copy_base;
/*
@@ -498,11 +557,15 @@
/*
* if we're not on the last iovec or the iovec is too large to
- * fit, then indicate a fragment.
+ * fit, then indicate a fragment. This also means that the next
+ * message will have the continuation of this one.
*/
if ((i < (iov_len - 1)) ||
((copy_base + copy_len) < iovec[i].iov_len)) {
mcast.fragmented = 1;
+ fragment_continuation = 1;
+ } else {
+ fragment_continuation = 0;
}
/*
--
Mark Haverkamp <markh at osdl.org>
More information about the Openais
mailing list