[Openais] Patch II design review revision 2 !!

Muni Bajpai muniba at nortel.com
Fri Feb 25 14:10:20 PST 2005


Skipped content of type multipart/alternative-------------- next part --------------
diff -uNr --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet --exclude=init --exclude=LICENSE --exclude=Makefile --exclude=man --exclude=README.devmap --exclude=SECURITY --exclude=TODO --exclude=CHANGELOG --exclude=conf --exclude=loc --exclude=Makefile.samples --exclude=QUICKSTART --exclude=test --exclude=.cdtproject --exclude=.project ../latest/exec/ckpt.c ../bk_openais/exec/ckpt.c
--- ../latest/exec/ckpt.c	2005-02-25 14:07:12.000000000 -0600
+++ ../bk_openais/exec/ckpt.c	2005-02-25 15:07:03.000000000 -0600
@@ -16,7 +16,6 @@
  *   this list of conditions and the following disclaimer in the documentation
  *   and/or other materials provided with the distribution.
  * - Neither the name of the MontaVista Software, Inc. nor the names of its
- *   contributors may be used to endorse or promote products derived from this
  *   software without specific prior written permission.
  *
  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
@@ -57,18 +56,36 @@
 #include "totempg.h"
 
 #define LOG_SERVICE LOG_SERVICE_CKPT
+#define CKPT_MAX_SECTION_DATA_SEND (1024*400)
 #include "print.h"
 
-DECLARE_LIST_INIT(checkpointListHead);
+DECLARE_LIST_INIT(checkpoint_list_head);
 
-DECLARE_LIST_INIT(checkpointIteratorListHead);
+DECLARE_LIST_INIT(checkpoint_iterator_list_head);
+
+DECLARE_LIST_INIT(recovery_sync_list_head);
+
+DECLARE_LIST_INIT(checkpoint_recovery_list_head);
 
 struct checkpoint_cleanup {
     struct list_head list;
     struct saCkptCheckpoint *checkpoint;
 };
 
-//TODO static totempg_recovery_plug_handle ckpt_checkpoint_recovery_plug_handle;
+struct checkpoint_sync {
+	struct list_head list;
+	enum nodeexec_message_types type;
+	union {
+		struct req_exec_ckpt_synchronize_state request_state;
+		struct req_exec_ckpt_synchronize_section request_section;
+	} base;	
+};
+
+#ifdef TODO
+static void *tok_call_handle = 0;
+#endif
+
+/* TODO static totempg_recovery_plug_handle ckpt_checkpoint_recovery_plug_handle; */
 
 static int ckpt_exec_init_fn (void);
 
@@ -78,6 +95,10 @@
 
 static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct in_addr source_addr, int endian_conversion_required);
 
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required);
+
+static int message_handler_req_exec_ckpt_synchronize_section (void *message, struct in_addr source_addr, int endian_conversion_required);
+
 static int message_handler_req_exec_ckpt_checkpointclose (void *message, struct in_addr source_addr, int endian_conversion_required);
 
 static int message_handler_req_exec_ckpt_checkpointunlink (void *message, struct in_addr source_addr, int endian_conversion_required);
@@ -133,24 +154,27 @@
 static int message_handler_req_lib_ckpt_sectioniteratorinitialize (struct conn_info *conn_info, void *message);
 static int message_handler_req_lib_ckpt_sectioniteratornext (struct conn_info *conn_info, void *message);
 
-static int ckpt_confchg_fn (
-	enum totempg_configuration_type configuration_type,
-	struct in_addr *member_list, void *member_list_private,
-		int member_list_entries,
-	struct in_addr *left_list, void *left_list_private,
-		int left_list_entries,
-	struct in_addr *joined_list, void *joined_list_private,
-		int joined_list_entries,
-	struct memb_ring_id *ring_id) {
+static void ckpt_recovery_inititialize ();
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *);
+static void ckpt_recovery_finalize();
+static int ckpt_recovery_abort();
+static void ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries);
+static int recovery_section_create (SaCkptSectionDescriptorT *sectionDescriptor, SaNameT *checkpointName);
+static int recovery_section_write(SaCkptSectionIdT *sectionId, SaNameT *checkpointName,
+									void *newData, SaUint32T dataOffSet, SaUint32T dataSize);
 
-#ifdef TODO
-	if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
-		totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);
-	}
-#endif
 
-	return (0);
-}
+static struct memb_ring_id saved_ring_id;
+
+static int ckpt_confchg_fn(
+		enum totempg_configuration_type configuration_type,
+		struct in_addr *member_list, void *member_list_private,
+			int member_list_entries,
+		struct in_addr *left_list, void *left_list_private,
+			int left_list_entries,
+		struct in_addr *joined_list, void *joined_list_private,
+			int joined_list_entries,
+		struct memb_ring_id *ring_id);
 
 struct libais_handler ckpt_libais_handlers[] =
 {
@@ -231,7 +255,7 @@
 	},
 	{ /* 15 */
 		.libais_handler_fn	= message_handler_req_lib_ckpt_checkpointsynchronizeasync,
-		.response_size		= sizeof (struct res_lib_ckpt_checkpointsynchronizeasync), // TODO RESPONSE
+		.response_size		= sizeof (struct res_lib_ckpt_checkpointsynchronizeasync), /* TODO RESPONSE */
 		.response_id		= MESSAGE_RES_CKPT_CHECKPOINT_CHECKPOINTSYNCHRONIZEASYNC,
 	},
 	{ /* 16 */
@@ -258,7 +282,9 @@
 	message_handler_req_exec_ckpt_sectionexpirationtimeset,
 	message_handler_req_exec_ckpt_sectionwrite,
 	message_handler_req_exec_ckpt_sectionoverwrite,
-	message_handler_req_exec_ckpt_sectionread
+	message_handler_req_exec_ckpt_sectionread,
+	message_handler_req_exec_ckpt_synchronize_state,
+	message_handler_req_exec_ckpt_synchronize_section
 };
 
 struct service_handler ckpt_service_handler = {
@@ -273,18 +299,395 @@
 	.exec_dump_fn				= 0
 };
 
-static struct memb_ring_id saved_ring_id;
+static int setProcessorIndex(struct in_addr *proc_addr, 
+								struct ckpt_refcnt *ckpt_refcount) {
+	int i;
+	for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+		/*
+		 * If the source addresses match then this processor index 
+		 * has already been set
+		 */
+		if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+			return -1;
+		} else if (ckpt_refcount[i].addr.s_addr == 0) {
+			/*
+			 * If the source addresses do not match and this element
+			 * has no stored value then store the new value and 
+			 * return the Index.
+		 	 */		
+			memcpy(&ckpt_refcount[i].addr, proc_addr, sizeof(struct in_addr));			
+			return i;
+		}
+	}
+	/* 
+	 * Could not Find an empty slot 
+	 * to store the new Processor.	
+	 */
+	return -1;
+}
+
+static int findProcessorIndex(struct in_addr *proc_addr,
+								struct ckpt_refcnt *ckpt_refcount) { 
+	int i;
+	for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+		/*
+		 * If the source addresses match then return the index
+		 */
+		
+		if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+			return i;
+		}				
+	}
+	/* 
+	 * Could not Find the Processor 
+	 */
+	return -1;
+}
+
+static void initialize_ckpt_refcount_array (struct ckpt_refcnt *ckpt_refcount) {
+	memset((char*)ckpt_refcount, 0, PROCESSOR_COUNT_MAX * sizeof(struct ckpt_refcnt));
+}
+
+static void ckpt_recovery_inititialize () {
+	struct list_head *checkpoint_list;
+	struct saCkptCheckpoint *checkpoint;
+	struct checkpoint_sync *sync_msg;
+	struct checkpoint_sync *sync_section_msg;
+	struct saCkptCheckpoint *savedCheckpoint;
+	struct list_head *checkpoint_section_list;
+    struct saCkptCheckpointSection *ckptCheckpointSection;
+    SaSizeT origSectionSize;  
+    SaSizeT sectionDataSent;          
+    SaSizeT newSectionSize;
+    int proc_index;
+	
+
+   	for (checkpoint_list = checkpoint_list_head.next;
+        checkpoint_list != &checkpoint_list_head;
+        checkpoint_list = checkpoint_list->next) {
+
+        checkpoint = list_entry (checkpoint_list,
+            struct saCkptCheckpoint, list);
+
+		/*
+		 * 	Save off the elements in the new list
+		 */
+		savedCheckpoint = 
+			(struct saCkptCheckpoint *) malloc (sizeof(struct saCkptCheckpoint));
+		assert(savedCheckpoint);
+		memcpy(savedCheckpoint, checkpoint, sizeof(struct saCkptCheckpoint));
+		list_init(&savedCheckpoint->list);		
+		list_add(&savedCheckpoint->list,&checkpoint_recovery_list_head);	
+		
+		for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next;
+        	checkpoint_section_list != &checkpoint->checkpointSectionsListHead;
+            checkpoint_section_list = checkpoint_section_list->next) {
+
+            ckptCheckpointSection = list_entry (checkpoint_section_list,
+            	struct saCkptCheckpointSection, list);          	
+            
+            proc_index = findProcessorIndex(&this_ip.sin_addr,savedCheckpoint->ckpt_refcount);
+            
+            if (proc_index == -1 ) {	 
+		 		log_printf (LOG_LEVEL_ERROR, 
+		 			"CKPT: Could not find the checkpoint entry for this processor %p \n", 
+		 			checkpoint);
+		 		continue;
+		 	}		 	
+           	
+           	/*
+           	 * Create and save a new Sync message.
+           	 */
+			sync_msg = 
+				(struct checkpoint_sync *)malloc(sizeof(struct checkpoint_sync));
+			assert(sync_msg);
+			
+			sync_msg->type = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE;			
+			sync_msg->base.request_state.header.size =	sizeof (struct req_exec_ckpt_synchronize_state);
+			sync_msg->base.request_state.header.id = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE;
+			memcpy(&sync_msg->base.request_state.previous_ring_id, &saved_ring_id, sizeof(struct memb_ring_id));
+			memcpy(&sync_msg->base.request_state.checkpointName, &savedCheckpoint->name, sizeof(SaNameT));
+			memcpy(&sync_msg->base.request_state.checkpointCreationAttributes, 
+					&savedCheckpoint->checkpointCreationAttributes, 
+					sizeof(SaCkptCheckpointCreationAttributesT));
+			memcpy(&sync_msg->base.request_state.sectionDescriptor,
+					&ckptCheckpointSection->sectionDescriptor,
+					sizeof(SaCkptSectionDescriptorT));						
+			memcpy(&sync_msg->base.request_state.source_addr, &this_ip.sin_addr, sizeof(struct in_addr));
+			memcpy(&sync_msg->base.request_state.ref_count,
+		 			&savedCheckpoint->ckpt_refcount[proc_index].count,
+		 			sizeof(SaUint32T));				
+			
+		 	list_init(&sync_msg->list);		 	
+		 	list_add(&sync_msg->list,&recovery_sync_list_head); 
+		 	
+		 	origSectionSize = ckptCheckpointSection->sectionDescriptor.sectionSize;  
+            sectionDataSent = 0;          
+            newSectionSize = 0;
+            
+            /*
+             * Now Create SyncSection messsages in chunks of CKPT_MAX_SECTION_DATA_SEND or less
+             */
+            while (sectionDataSent < origSectionSize) {
+            	/*
+            	 * Send a Max of CKPT_MAX_SECTION_DATA_SEND of section data
+            	 */
+	        	if ((origSectionSize - sectionDataSent) > CKPT_MAX_SECTION_DATA_SEND) {             	
+	            	newSectionSize = CKPT_MAX_SECTION_DATA_SEND;
+	            }      
+	            else {
+	            	newSectionSize = (origSectionSize - sectionDataSent);
+	            }        
+	            
+	            /*
+	             * Create and save a new Sync Section message.
+	             */
+				sync_section_msg = 
+					(struct checkpoint_sync *)malloc(sizeof(struct checkpoint_sync) + newSectionSize);
+				assert(sync_section_msg);
+				
+				sync_section_msg->type = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESECTION;
+				sync_section_msg->base.request_section.header.size =	sizeof (struct req_exec_ckpt_synchronize_section);
+				sync_section_msg->base.request_section.header.id = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESECTION;    
+	            memcpy (&sync_section_msg->base.request_section.checkpointName, &savedCheckpoint->name, sizeof(SaNameT));
+	            memcpy (&sync_section_msg->base.request_section.sectionId,
+						&ckptCheckpointSection->sectionDescriptor.sectionId,
+						sizeof(SaCkptSectionIdT));	    	    
+				memcpy (&sync_section_msg->base.request_section.dataOffSet, &sectionDataSent, sizeof(SaUint32T));
+				memcpy (&sync_section_msg->base.request_section.dataSize, &newSectionSize, sizeof(SaUint32T));
+				memcpy ((char*)sync_section_msg + sizeof(struct checkpoint_sync), 
+						((char*)ckptCheckpointSection->sectionData + sectionDataSent),
+						newSectionSize);
+					
+            	sectionDataSent += newSectionSize;
+            	list_init(&sync_section_msg->list);
+            	list_add(&sync_section_msg->list,&recovery_sync_list_head);				
+            }           
+            	
+        }
+	}	
+#ifdef TODO
+	int res = totempg_token_callback_create(&tok_call_handle, 
+										TOTEMPG_CALLBACK_TOKEN_SENT,
+										0,
+										ckpt_recovery_process,
+										NULL);
+#endif
+}
+
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *data ) {
+
+	struct req_exec_ckpt_synchronize_state *request_exec_sync_state;
+	struct req_exec_ckpt_synchronize_section *request_exec_sync_section;
+	struct iovec iovecs[2];
+	struct list_head *sync_list;	
+	struct checkpoint_sync *sync_element;		
+	
+	/*
+	 * Check for empty list here
+	 */
+	if (list_empty(&recovery_sync_list_head)) {
+		return (0);
+	}	
+	
+	while (1) { /*Go for as long as the oubound queue is not full*/
+		/*
+		 * Extract the element
+		 */
+		sync_list = recovery_sync_list_head.next;
+		sync_element = list_entry (sync_list,
+	            struct checkpoint_sync, list);
+	            
+	    log_printf (LOG_LEVEL_DEBUG, "callback recover_process.\n");
+	    if (sync_element->type == MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE) {
+	    	/*
+		     * Populate the Sync State Request
+		     */
+	    	request_exec_sync_state = &sync_element->base.request_state;		
+			iovecs[0].iov_base = (char *)&request_exec_sync_state;
+			iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_state);
+			/*
+			 * Check to see if we can queue the new message and if you can
+			 * then mcast the message else break and create callback.
+			 */
+			if (totempg_send_ok(iovecs[0].iov_len)){				 
+				assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);
+				log_printf (LOG_LEVEL_DEBUG, "CKPT: Multicasted Sync State Message.\n");
+			}			
+			else {
+				log_printf (LOG_LEVEL_DEBUG, "CKPT: Outbound Queue full need to Create Callback.\n");
+				break;
+			}
+	    }
+		
+		else  {		
+		    /*
+		     * Populate the Sync Section Request
+		     */
+		    request_exec_sync_section = &sync_element->base.request_section;	
+			iovecs[0].iov_base = (char *)&request_exec_sync_section;
+			iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_section);
+			/*
+			 * Populate the Section Data.
+			 */
+			iovecs[1].iov_base = (char *)&sync_element
+								+ sizeof (struct checkpoint_sync);
+			iovecs[1].iov_len = request_exec_sync_section->dataSize;
+			/*
+			 * Check to see if we can queue the new message and if you can
+			 * then mcast the message else break and create callback.
+			 */
+			if (totempg_send_ok(iovecs[0].iov_len + iovecs[1].iov_len)){				 
+				assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
+				log_printf (LOG_LEVEL_DEBUG, "CKPT: Multicasted Sync Section Message.\n");
+			}			
+			else {
+				log_printf (LOG_LEVEL_DEBUG, "CKPT: Outbound Queue full need to Create Callback.\n");
+				break;
+			}									
+		}		
+		list_del(&sync_element->list);
+		free(sync_element);
+
+		/*
+		 * Check for empty list here
+		 */
+		if (list_empty(&recovery_sync_list_head)) {
+			ckpt_recovery_finalize();
+			return (0);
+		}		
+	}
+#ifdef TODO
+	/*
+	 * We have more to send ...
+	 */
+	int res = totempg_token_callback_create(&tok_call_handle, 
+										TOTEMPG_CALLBACK_TOKEN_SENT,
+										0,
+										ckpt_recovery_process,
+										NULL);
+#endif
+}
+
+static void ckpt_recovery_finalize () {
+	struct list_head *checkpoint_list;
+    struct saCkptCheckpoint *checkpoint;
+
+	/*
+	 * Remove All elements from old checkpoint
+	 * list
+	 */
+	checkpoint_list = checkpoint_list_head.next;	
+	while (!list_empty(&checkpoint_list_head)) {
+		checkpoint = list_entry (checkpoint_list,
+            			struct saCkptCheckpoint, list);
+		list_del(&checkpoint->list);
+		free(checkpoint);
+		checkpoint_list = checkpoint_list_head.next;
+	}
+	
+	/*
+	 * Initialize the old list again.
+	 */
+	list_init(&checkpoint_list_head);
+	
+	/*
+	 * Copy the contents of the new list_head into the old list head
+	 */
+	memcpy(&checkpoint_list_head, &checkpoint_recovery_list_head, sizeof(struct list_head));
+
+	/*
+	 * Initialize the new list head for reuse.
+	 */
+	list_init(&checkpoint_recovery_list_head);
+	
+}
+
+static int ckpt_recovery_abort () {
+/*
+ * TODO add some code here to abort the recovery process
+ */
+	return (0);
+}
+
+static void ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries) {
+	struct list_head *checkpoint_list;
+	struct saCkptCheckpoint *checkpoint;
+	struct in_addr *member;
+	int index;
+	int i;
+	
+	if (left_list_entries == 0) {
+		return;
+	}
+	
+	/*
+	 *  Iterate left_list_entries. 
+	 */
+	member = left_list;
+	for (i = 0; i < left_list_entries; i++) {
+		for (checkpoint_list = checkpoint_list_head.next;
+			checkpoint_list != &checkpoint_list_head;
+			checkpoint_list = checkpoint_list->next) {
+
+			checkpoint = list_entry (checkpoint_list,
+				struct saCkptCheckpoint, list);			
+			index = findProcessorIndex(member, checkpoint->ckpt_refcount);			
+			if (index == -1) {
+				continue;
+			}		
+			/*
+			 * Decrement
+			 * 
+			 */
+			if (checkpoint->referenceCount > 0) {
+				checkpoint->referenceCount -= checkpoint->ckpt_refcount[index].count;
+			} else {
+				/*TODO Log Error here*/				
+			}						
+			checkpoint->ckpt_refcount[index].count = 0;
+			memset((char*)&checkpoint->ckpt_refcount[index].addr, 0, sizeof(struct in_addr));			
+		}
+		member++;
+	}
+	return;
+}
+
+static int ckpt_confchg_fn (
+	enum totempg_configuration_type configuration_type,
+	struct in_addr *member_list, void *member_list_private,
+		int member_list_entries,
+	struct in_addr *left_list, void *left_list_private,
+		int left_list_entries,
+	struct in_addr *joined_list, void *joined_list_private,
+		int joined_list_entries,
+	struct memb_ring_id *ring_id) {
+
+	if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
+#ifdef TODO	
+		totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);	
+#endif
+		memcpy (&saved_ring_id, ring_id, sizeof(struct memb_ring_id));
+	}	
+
+	else if (configuration_type == TOTEMPG_CONFIGURATION_TRANSITIONAL) {
+		ckpt_recovery_process_members_exit(left_list, left_list_entries);
+		ckpt_recovery_inititialize ();
+	}
+	
+	return (0);
+}
 
 static struct saCkptCheckpoint *ckpt_checkpoint_find_global (SaNameT *name)
 {
-	struct list_head *checkpointList;
+	struct list_head *checkpoint_list;
 	struct saCkptCheckpoint *checkpoint;
 
-   for (checkpointList = checkpointListHead.next;
-        checkpointList != &checkpointListHead;
-        checkpointList = checkpointList->next) {
+   for (checkpoint_list = checkpoint_list_head.next;
+        checkpoint_list != &checkpoint_list_head;
+        checkpoint_list = checkpoint_list->next) {
 
-        checkpoint = list_entry (checkpointList,
+        checkpoint = list_entry (checkpoint_list,
             struct saCkptCheckpoint, list);
 
 		if (name_match (name, &checkpoint->name)) {
@@ -320,15 +723,15 @@
 	char *id,
 	int idLen)
 {
-	struct list_head *checkpointSectionList;
+	struct list_head *checkpoint_section_list;
 	struct saCkptCheckpointSection *ckptCheckpointSection;
 
 	log_printf (LOG_LEVEL_DEBUG, "Finding checkpoint section id %s %d\n", id, idLen);
-	for (checkpointSectionList = ckptCheckpoint->checkpointSectionsListHead.next;
-		checkpointSectionList != &ckptCheckpoint->checkpointSectionsListHead;
-		checkpointSectionList = checkpointSectionList->next) {
+	for (checkpoint_section_list = ckptCheckpoint->checkpointSectionsListHead.next;
+		checkpoint_section_list != &ckptCheckpoint->checkpointSectionsListHead;
+		checkpoint_section_list = checkpoint_section_list->next) {
 
-		ckptCheckpointSection = list_entry (checkpointSectionList,
+		ckptCheckpointSection = list_entry (checkpoint_section_list,
 			struct saCkptCheckpointSection, list);
 	
 		log_printf (LOG_LEVEL_DEBUG, "Checking section id %*s\n", 
@@ -404,9 +807,12 @@
 
 static int ckpt_exec_init_fn (void)
 {
-	// Initialize the saved ring ID.
+	/*
+	 *  Initialize the saved ring ID.
+	 */
 	saved_ring_id.seq = 0;
-	saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;	
+	saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;		
+	
 #ifdef TODO
 	int res;
 	res = totempg_recovery_plug_create (&ckpt_checkpoint_recovery_plug_handle);
@@ -445,9 +851,11 @@
 	}
 
 #ifdef TODO
-/* todo close section iterators
+/* TODO close section iterators
+ */
+/* 
+ * TODO what about exit of open checkpoints
  */
-// TODO what about exit of open checkpoints
 
 	if (conn_info->ais_ci.u.libckpt_ci.sectionIterator.sectionIteratorEntries) {
 		free (conn_info->ais_ci.u.libckpt_ci.sectionIterator.sectionIteratorEntries);
@@ -511,10 +919,11 @@
 		ckptCheckpoint->unlinked = 0;
 		list_init (&ckptCheckpoint->list);
 		list_init (&ckptCheckpoint->checkpointSectionsListHead);
-		list_add (&ckptCheckpoint->list, &checkpointListHead);
+		list_add (&ckptCheckpoint->list, &checkpoint_list_head);
 		ckptCheckpoint->referenceCount = 0;
 		ckptCheckpoint->retention_timer = 0;
 		ckptCheckpoint->expired = 0;
+		initialize_ckpt_refcount_array(ckptCheckpoint->ckpt_refcount);
 
 		/*
 		 * Add in default checkpoint section
@@ -530,7 +939,7 @@
 		ckptCheckpointSection->sectionDescriptor.sectionSize = 0;
 		ckptCheckpointSection->sectionDescriptor.expirationTime = SA_TIME_END;
 		ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID;
-		ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; // current time
+		ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; /*current time*/
 		ckptCheckpointSection->sectionData = 0;
 		ckptCheckpointSection->expiration_timer = 0;
 	}
@@ -548,7 +957,26 @@
 	 */
 	log_printf (LOG_LEVEL_DEBUG, "CHECKPOINT opened is %p\n", ckptCheckpoint);
 	ckptCheckpoint->referenceCount += 1;
-
+	
+	/*
+	 * Add the connection reference information to the Checkpoint to be
+	 * sent out later as a part of the sync process.
+	 * 
+	 */
+	 
+	 int proc_index = findProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+	 if (proc_index == -1) {/* Could not find, lets set the processor to an index.*/
+	 	proc_index = setProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+	 }
+	 if (proc_index != -1 ) {	 
+	 	ckptCheckpoint->ckpt_refcount[proc_index].addr = source_addr;
+	 	ckptCheckpoint->ckpt_refcount[proc_index].count++;
+	 }
+	 else {
+	 	log_printf (LOG_LEVEL_ERROR, 
+	 				"CKPT: MAX LIMIT OF PROCESSORS reached. Cannot store new proc %p info.\n", 
+	 				ckptCheckpoint);
+	 }
 	/*
 	 * Reset retention duration since this checkpoint was just opened
 	 */
@@ -581,10 +1009,62 @@
 			sizeof (struct res_lib_ckpt_checkpointopen));
 	}
 
-//	return (error == SA_AIS_OK ? 0 : -1);
+/*	return (error == SA_AIS_OK ? 0 : -1); */
+	return (0);
+}
+
+/**/
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required) {
+	int retcode;
+	struct req_exec_ckpt_checkpointopen request_open_exec;	
+	struct req_lib_ckpt_checkpointopen request_open_lib;		
+	struct req_exec_ckpt_synchronize_state *req_exec_ckpt_sync_state 
+					= (struct req_exec_ckpt_synchronize_state *)message;
+					
+	/*
+	 * If the Incoming message's previous ring id == saved_ring_id
+	 * Ignore because we have seen this message before.
+	 */
+	if (memcmp (&req_exec_ckpt_sync_state->previous_ring_id, &saved_ring_id,sizeof (struct memb_ring_id)) != 0) {
+			return(0);
+	}
+	request_open_lib.checkpointName = req_exec_ckpt_sync_state->checkpointName;
+	request_open_lib.checkpointCreationAttributes = req_exec_ckpt_sync_state->checkpointCreationAttributes;	
+	request_open_exec.req_lib_ckpt_checkpointopen = request_open_lib;	
+	message_handler_req_exec_ckpt_checkpointopen(&request_open_exec, req_exec_ckpt_sync_state->source_addr, 0);
+	
+	retcode = recovery_section_create (&req_exec_ckpt_sync_state->sectionDescriptor,
+										&req_exec_ckpt_sync_state->checkpointName);
+	if (retcode != SA_AIS_OK) {
+		log_printf(LOG_LEVEL_ERROR, "CKPT: message_handler_req_exec_ckpt_synchronize_state\n");
+		log_printf(LOG_LEVEL_ERROR, "CKPT: recovery_section_create returned %d\n",retcode);		
+	}	
+	
 	return (0);
 }
 
+static int message_handler_req_exec_ckpt_synchronize_section (void *message, struct in_addr source_addr, int endian_conversion_required) {
+	int retcode;
+	struct req_exec_ckpt_synchronize_section *req_exec_ckpt_sync_section 
+					= (struct req_exec_ckpt_synchronize_section *)message;
+	/*
+	 * Write the contents of the section to the checkpoint section.
+	 */
+	retcode = recovery_section_write(&req_exec_ckpt_sync_section->sectionId, 
+							&req_exec_ckpt_sync_section->checkpointName,
+							(char*)req_exec_ckpt_sync_section
+								+ sizeof (struct req_exec_ckpt_synchronize_section), 
+							req_exec_ckpt_sync_section->dataOffSet, 
+							req_exec_ckpt_sync_section->dataSize);
+	if (retcode != SA_AIS_OK) {
+		log_printf(LOG_LEVEL_ERROR, "CKPT: message_handler_req_exec_ckpt_synchronize_section\n");
+		log_printf(LOG_LEVEL_ERROR, "CKPT: recovery_section_write returned %d\n",retcode);		
+	}
+	
+	return (0);
+}
+
+
 unsigned int abstime_to_msec (SaTimeT time)
 {
 	struct timeval tv;
@@ -644,6 +1124,20 @@
 	}
 
 	checkpoint->referenceCount--;
+	/*
+	 * Modify the connection reference information to the Checkpoint to be
+	 * sent out later as a part of the sync process.	 
+	 */
+	
+	int proc_index = findProcessorIndex(&source_addr, checkpoint->ckpt_refcount);
+	if (proc_index != -1 ) {	 		
+	 	checkpoint->ckpt_refcount[proc_index].count--;
+	}
+	else {
+		log_printf (LOG_LEVEL_ERROR, 
+	 				"CKPT: Could Not find Processor Info %p info.\n", 
+	 				checkpoint);
+	}
 	assert (checkpoint->referenceCount >= 0);
 	log_printf (LOG_LEVEL_DEBUG, "disconnect called, new CKPT ref count is %d\n", 
 		checkpoint->referenceCount);
@@ -652,10 +1146,10 @@
 	 * If checkpoint has been unlinked and this is the last reference, delete it
 	 */
 	if (checkpoint->unlinked && checkpoint->referenceCount == 0) {
-		log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
+		log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");		
 		checkpoint_release (checkpoint);
 	} else
-	if (checkpoint->referenceCount == 0) {
+	if (checkpoint->referenceCount == 0) {		
 		poll_timer_add (aisexec_poll_handle,
 			checkpoint->checkpointCreationAttributes.retentionDuration / 1000000,
 			checkpoint,
@@ -791,6 +1285,96 @@
 	return (0);
 }
 
+static int recovery_section_create (SaCkptSectionDescriptorT *sectionDescriptor,
+									SaNameT *checkpointName) {
+	struct saCkptCheckpoint *ckptCheckpoint;
+	struct saCkptCheckpointSection *ckptCheckpointSection;
+	void *initialData;
+	void *sectionId;
+	SaErrorT error = SA_AIS_OK;		
+	
+	ckptCheckpoint = ckpt_checkpoint_find_global (checkpointName);
+	if (ckptCheckpoint == 0) {		
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	/*
+	 * Determine if user-specified checkpoint ID already exists
+	 */	
+	ckptCheckpointSection = ckpt_checkpoint_find_globalSection (ckptCheckpoint,
+								((char *)sectionDescriptor->sectionId.id),
+								(int)sectionDescriptor->sectionId.idLen);
+	if (ckptCheckpointSection) {
+		error = SA_AIS_ERR_EXIST;
+		goto error_exit;
+	}
+
+	/*
+	 * Allocate checkpoint section	
+	 */
+	ckptCheckpointSection = malloc (sizeof (struct saCkptCheckpointSection));
+	if (ckptCheckpointSection == 0) {
+		error = SA_AIS_ERR_NO_MEMORY;
+		goto error_exit;
+	}
+	/*
+	 * Allocate checkpoint section data
+	 */
+	initialData = malloc (sectionDescriptor->sectionSize);
+	if (initialData == 0) {
+		free (ckptCheckpointSection);
+		error = SA_AIS_ERR_NO_MEMORY;
+		goto error_exit;
+	}
+	/*
+	 * Allocate checkpoint section id
+	 */
+	sectionId = malloc ((int)sectionDescriptor->sectionId.idLen);
+	if (sectionId == 0) {
+		free (ckptCheckpointSection);
+		free (initialData);
+		error = SA_AIS_ERR_NO_MEMORY;
+		goto error_exit;
+	}
+	/*
+	 * Copy checkpoint section ID and initialize data.
+	 */
+	memcpy (sectionId, ((char *)sectionDescriptor->sectionId.id),
+		(int)sectionDescriptor->sectionId.idLen);
+	
+	memset (initialData, 0, sectionDescriptor->sectionSize);
+	
+	/*
+	 * Configure checkpoint section
+	 */
+	memcpy(&ckptCheckpointSection->sectionDescriptor, 
+			sectionDescriptor,
+			sizeof(SaCkptSectionDescriptorT));	
+	ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID;	
+	ckptCheckpointSection->sectionData = initialData;
+	ckptCheckpointSection->expiration_timer = 0;
+
+	if (sectionDescriptor->expirationTime != SA_TIME_END) {
+		poll_timer_add (aisexec_poll_handle,
+			abstime_to_msec (ckptCheckpointSection->sectionDescriptor.expirationTime),
+			ckptCheckpointSection,
+			timer_function_section_expire,
+			&ckptCheckpointSection->expiration_timer);
+	}
+
+	/*
+	 * Add checkpoint section to checkpoint
+	 */
+	list_init (&ckptCheckpointSection->list);
+	list_add (&ckptCheckpointSection->list,
+		&ckptCheckpoint->checkpointSectionsListHead);
+
+error_exit:
+	return (error);				
+
+}
+
 static int message_handler_req_exec_ckpt_sectioncreate (void *message, struct in_addr source_addr, int endian_conversion_required) {
 	struct req_exec_ckpt_sectioncreate *req_exec_ckpt_sectioncreate = (struct req_exec_ckpt_sectioncreate *)message;
 	struct req_lib_ckpt_sectioncreate *req_lib_ckpt_sectioncreate = (struct req_lib_ckpt_sectioncreate *)&req_exec_ckpt_sectioncreate->req_lib_ckpt_sectioncreate;
@@ -804,7 +1388,7 @@
 	log_printf (LOG_LEVEL_DEBUG, "Executive request to create a checkpoint section.\n");
 	ckptCheckpoint = ckpt_checkpoint_find_global (&req_exec_ckpt_sectioncreate->checkpointName);
 	if (ckptCheckpoint == 0) {
-		error = SA_AIS_ERR_LIBRARY; // TODO find the right error for this
+		error = SA_AIS_ERR_LIBRARY; /* TODO find the right error for this*/
 		goto error_exit;
 	}
 
@@ -867,7 +1451,7 @@
 	ckptCheckpointSection->sectionDescriptor.sectionSize = req_lib_ckpt_sectioncreate->initialDataSize;
 	ckptCheckpointSection->sectionDescriptor.expirationTime = req_lib_ckpt_sectioncreate->expirationTime;
 	ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID;
-	ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; // TODO current time
+	ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; /* TODO current time */
 	ckptCheckpointSection->sectionData = initialData;
 	ckptCheckpointSection->expiration_timer = 0;
 
@@ -1015,6 +1599,58 @@
 	return (0);
 }
 
+static int recovery_section_write(SaCkptSectionIdT *sectionId,
+									SaNameT *checkpointName,
+									void *newData,
+									SaUint32T dataOffSet,
+									SaUint32T dataSize) {
+	struct saCkptCheckpoint *ckptCheckpoint;
+	struct saCkptCheckpointSection *ckptCheckpointSection;
+	int sizeRequired;	
+	SaErrorT error = SA_AIS_OK;
+	char *sd;	
+	
+	log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery_section_write.\n");
+	ckptCheckpoint = ckpt_checkpoint_find_global (checkpointName);
+	if (ckptCheckpoint == 0) {
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	/*
+	 * Find checkpoint section to be written
+	 */
+	ckptCheckpointSection = ckpt_checkpoint_find_globalSection (ckptCheckpoint,
+								((char *)sectionId->id),
+								(int)sectionId->idLen);
+	if (ckptCheckpointSection == 0) {		
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	/*
+	 * If write would extend past end of section data, return error;
+	 */
+	sizeRequired = dataOffSet + dataSize;
+	if (sizeRequired > ckptCheckpointSection->sectionDescriptor.sectionSize) {
+		error = SA_AIS_ERR_ACCESS;
+		goto error_exit;		
+	}
+	
+	/*
+	 * Write checkpoint section to section data
+	 */
+	if (dataSize > 0) {			
+		sd = (char *)ckptCheckpointSection->sectionData;
+		memcpy (&sd[dataOffSet],
+			newData,
+			dataSize);
+	}	
+error_exit:	
+	return (error);	
+}
+
+
 static int message_handler_req_exec_ckpt_sectionwrite (void *message, struct in_addr source_addr, int endian_conversion_required) {
 	struct req_exec_ckpt_sectionwrite *req_exec_ckpt_sectionwrite = (struct req_exec_ckpt_sectionwrite *)message;
 	struct req_lib_ckpt_sectionwrite *req_lib_ckpt_sectionwrite = (struct req_lib_ckpt_sectionwrite *)&req_exec_ckpt_sectionwrite->req_lib_ckpt_sectionwrite;
@@ -1032,7 +1668,9 @@
 		goto error_exit;
 	}
 
-//printf ("writing checkpoint section is %s\n", ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite));
+/*
+	printf ("writing checkpoint section is %s\n", ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite));
+*/
 	/*
 	 * Find checkpoint section to be written
 	 */
@@ -1147,7 +1785,7 @@
 	 */
 	ckptCheckpointSection->sectionDescriptor.sectionSize = req_lib_ckpt_sectionoverwrite->dataSize;
 	ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID;
-	ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; // TODO current time
+	ckptCheckpointSection->sectionDescriptor.lastUpdate = 0; /* TODO current time */
 	ckptCheckpointSection->sectionData = sectionData;
 
 	/*
@@ -1178,7 +1816,7 @@
 
 	ckptCheckpoint = ckpt_checkpoint_find_global (&req_exec_ckpt_sectionread->checkpointName);
 	if (ckptCheckpoint == 0) {
-		error = SA_AIS_ERR_LIBRARY; // TODO find the right error for this
+		error = SA_AIS_ERR_LIBRARY; /* TODO find the right error for this */
 		goto error_exit;
 	}
 
@@ -1258,7 +1896,7 @@
 		conn_info->ais_ci.u.libckpt_ci.sectionIterator.iteratorCount = 0;
 		conn_info->ais_ci.u.libckpt_ci.sectionIterator.iteratorPos = 0;
 		list_add (&conn_info->ais_ci.u.libckpt_ci.sectionIterator.list,
-			&checkpointIteratorListHead);
+			&checkpoint_iterator_list_head);
 		list_init (&conn_info->ais_ci.u.libckpt_ci.checkpoint_list);
 		error = SA_AIS_OK;
 	}
@@ -1400,7 +2038,7 @@
 	struct saCkptCheckpoint *checkpoint;
 	int memoryUsed = 0;
 	int numberOfSections = 0;
-	struct list_head *checkpointSectionList;
+	struct list_head *checkpoint_section_list;
 	struct saCkptCheckpointSection *checkpointSection;
 
 	log_printf (LOG_LEVEL_DEBUG, "in status get\n");
@@ -1410,11 +2048,11 @@
 	 */
 	checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_checkpointstatusget->checkpointName);
 
-	for (checkpointSectionList = checkpoint->checkpointSectionsListHead.next;
-		checkpointSectionList != &checkpoint->checkpointSectionsListHead;
-		checkpointSectionList = checkpointSectionList->next) {
+	for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next;
+		checkpoint_section_list != &checkpoint->checkpointSectionsListHead;
+		checkpoint_section_list = checkpoint_section_list->next) {
 
-		checkpointSection = list_entry (checkpointSectionList,
+		checkpointSection = list_entry (checkpoint_section_list,
 			struct saCkptCheckpointSection, list);
 
 		memoryUsed += checkpointSection->sectionDescriptor.sectionSize;
@@ -1614,7 +2252,9 @@
 	iovecs[1].iov_base = ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite);
 	iovecs[1].iov_len = req_lib_ckpt_sectionwrite->header.size - sizeof (struct req_lib_ckpt_sectionwrite);
 
-//printf ("LIB writing checkpoint section is %s\n", ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite));
+/*
+	printf ("LIB writing checkpoint section is %s\n", ((char *)req_lib_ckpt_sectionwrite) + sizeof (struct req_lib_ckpt_sectionwrite));
+*/
 	if (iovecs[1].iov_len > 0) {
 		assert (totempg_mcast (iovecs, 2, TOTEMPG_AGREED) == 0);
 	} else {
@@ -1730,7 +2370,7 @@
 	struct saCkptCheckpointSection *ckptCheckpointSection;
 	struct saCkptSectionIteratorEntry *ckptSectionIteratorEntries;
 	struct saCkptSectionIterator *ckptSectionIterator;
-	struct list_head *checkpointSectionList;
+	struct list_head *checkpoint_section_list;
 	int addEntry = 0;
 	int iteratorEntries = 0;
 	SaErrorT error = SA_AIS_OK;
@@ -1747,11 +2387,11 @@
 	/*
 	 * Iterate list of checkpoint sections
 	 */
-	for (checkpointSectionList = ckptCheckpoint->checkpointSectionsListHead.next;
-		checkpointSectionList != &ckptCheckpoint->checkpointSectionsListHead;
-		checkpointSectionList = checkpointSectionList->next) {
+	for (checkpoint_section_list = ckptCheckpoint->checkpointSectionsListHead.next;
+		checkpoint_section_list != &ckptCheckpoint->checkpointSectionsListHead;
+		checkpoint_section_list = checkpoint_section_list->next) {
 
-		ckptCheckpointSection = list_entry (checkpointSectionList,
+		ckptCheckpointSection = list_entry (checkpoint_section_list,
 			struct saCkptCheckpointSection, list);
 
 		addEntry = 1;
diff -uNr --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet --exclude=init --exclude=LICENSE --exclude=Makefile --exclude=man --exclude=README.devmap --exclude=SECURITY --exclude=TODO --exclude=CHANGELOG --exclude=conf --exclude=loc --exclude=Makefile.samples --exclude=QUICKSTART --exclude=test --exclude=.cdtproject --exclude=.project ../latest/include/ipc_ckpt.h ../bk_openais/include/ipc_ckpt.h
--- ../latest/include/ipc_ckpt.h	2005-02-25 14:07:12.000000000 -0600
+++ ../bk_openais/include/ipc_ckpt.h	2005-02-24 14:55:42.000000000 -0600
@@ -37,6 +37,8 @@
 #include "../include/ipc_gen.h"
 #include "../include/ais_types.h"
 #include "../include/saCkpt.h"
+#include "../exec/totemsrp.h"
+#include "../exec/ckpt.h"
 
 enum req_lib_ckpt_checkpoint_types {
 	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN = 1,
@@ -326,4 +328,24 @@
 	struct res_header header;
 };
 
+struct req_exec_ckpt_synchronize_state {
+	struct req_header header;
+	struct memb_ring_id previous_ring_id;
+	SaNameT checkpointName;
+	SaCkptCheckpointCreationAttributesT checkpointCreationAttributes;
+	SaCkptSectionDescriptorT sectionDescriptor;	
+	struct in_addr source_addr;
+	SaUint32T ref_count;	
+};
+
+struct req_exec_ckpt_synchronize_section {
+	struct req_header header;
+	SaNameT checkpointName;
+	SaCkptSectionIdT sectionId;	
+	SaUint32T dataOffSet;
+	SaUint32T dataSize;	
+};
+
+
+
 #endif /* IPC_CKPT_H_DEFINED */
diff -uNr --exclude=SCCS --exclude=BitKeeper --exclude=ChangeSet --exclude=init --exclude=LICENSE --exclude=Makefile --exclude=man --exclude=README.devmap --exclude=SECURITY --exclude=TODO --exclude=CHANGELOG --exclude=conf --exclude=loc --exclude=Makefile.samples --exclude=QUICKSTART --exclude=test --exclude=.cdtproject --exclude=.project ../latest/include/ipc_gen.h ../bk_openais/include/ipc_gen.h
--- ../latest/include/ipc_gen.h	2005-02-25 14:07:12.000000000 -0600
+++ ../bk_openais/include/ipc_gen.h	2005-02-24 13:42:21.000000000 -0600
@@ -69,6 +69,8 @@
 	MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE,
 	MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE,
 	MESSAGE_REQ_EXEC_CKPT_SECTIONREAD,
+	MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE,
+	MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESECTION,
 	MESSAGE_REQ_EXEC_EVT_EVENTDATA,
 	MESSAGE_REQ_EXEC_EVT_CHANCMD,
 	MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA


More information about the Openais mailing list