[Openais] Patch II design review revision 1 !!

Muni Bajpai muniba at nortel.com
Wed Feb 23 12:18:32 PST 2005


Skipped content of type multipart/alternative-------------- next part --------------
--- ../latest/exec/ckpt.c	2005-02-23 12:56:16.000000000 -0600
+++ ../bk_openais/exec/ckpt.c	2005-02-23 13:02:46.000000000 -0600
@@ -16,7 +16,6 @@
  *   this list of conditions and the following disclaimer in the documentation
  *   and/or other materials provided with the distribution.
  * - Neither the name of the MontaVista Software, Inc. nor the names of its
- *   contributors may be used to endorse or promote products derived from this
  *   software without specific prior written permission.
  *
  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
@@ -57,17 +56,30 @@
 #include "totempg.h"
 
 #define LOG_SERVICE LOG_SERVICE_CKPT
+#define CKPT_MAX_SECTION_DATA_SEND (1024*400)
 #include "print.h"
 
-DECLARE_LIST_INIT(checkpointListHead);
+DECLARE_LIST_INIT(checkpoint_list_head);
 
-DECLARE_LIST_INIT(checkpointIteratorListHead);
+DECLARE_LIST_INIT(checkpoint_iterator_list_head);
+
+DECLARE_LIST_INIT(recovery_sync_list_head);
+
+DECLARE_LIST_INIT(checkpoint_recovery_list_head);
 
 struct checkpoint_cleanup {
     struct list_head list;
     struct saCkptCheckpoint *checkpoint;
 };
 
+struct checkpoint_sync {
+	struct list_head list;
+	struct req_exec_ckpt_synchronize_state request;
+};
+#ifdef TODO
+static void *tok_call_handle = 0;
+#endif
+
 //TODO static totempg_recovery_plug_handle ckpt_checkpoint_recovery_plug_handle;
 
 static int ckpt_exec_init_fn (void);
@@ -78,6 +90,8 @@
 
 static int message_handler_req_exec_ckpt_checkpointopen (void *message, struct in_addr source_addr, int endian_conversion_required);
 
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required);
+
 static int message_handler_req_exec_ckpt_checkpointclose (void *message, struct in_addr source_addr, int endian_conversion_required);
 
 static int message_handler_req_exec_ckpt_checkpointunlink (void *message, struct in_addr source_addr, int endian_conversion_required);
@@ -133,24 +147,28 @@
 static int message_handler_req_lib_ckpt_sectioniteratorinitialize (struct conn_info *conn_info, void *message);
 static int message_handler_req_lib_ckpt_sectioniteratornext (struct conn_info *conn_info, void *message);
 
-static int ckpt_confchg_fn (
-	enum totempg_configuration_type configuration_type,
-	struct in_addr *member_list, void *member_list_private,
-		int member_list_entries,
-	struct in_addr *left_list, void *left_list_private,
-		int left_list_entries,
-	struct in_addr *joined_list, void *joined_list_private,
-		int joined_list_entries,
-	struct memb_ring_id *ring_id) {
+//RECOVERY_MUNI
+static void ckpt_recovery_inititialize ();
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *);
+static int ckpt_recovery_finalize();
+static int ckpt_recovery_abort();
+static int ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries);
+static int recovery_section_create (SaCkptSectionDescriptorT *sectionDescriptor, SaNameT *checkpointName);
+static int recovery_section_write(SaCkptSectionDescriptorT *sectionDescriptor, SaNameT *checkpointName,
+									void *newData, SaUint32T dataOffSet, SaUint32T dataSize);
 
-#ifdef TODO
-	if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
-		totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);
-	}
-#endif
 
-	return (0);
-}
+static struct memb_ring_id saved_ring_id;
+
+static int ckpt_confchg_fn(
+		enum totempg_configuration_type configuration_type,
+		struct in_addr *member_list, void *member_list_private,
+			int member_list_entries,
+		struct in_addr *left_list, void *left_list_private,
+			int left_list_entries,
+		struct in_addr *joined_list, void *joined_list_private,
+			int joined_list_entries,
+		struct memb_ring_id *ring_id);
 
 struct libais_handler ckpt_libais_handlers[] =
 {
@@ -258,7 +276,8 @@
 	message_handler_req_exec_ckpt_sectionexpirationtimeset,
 	message_handler_req_exec_ckpt_sectionwrite,
 	message_handler_req_exec_ckpt_sectionoverwrite,
-	message_handler_req_exec_ckpt_sectionread
+	message_handler_req_exec_ckpt_sectionread,
+	message_handler_req_exec_ckpt_synchronize_state
 };
 
 struct service_handler ckpt_service_handler = {
@@ -273,18 +292,289 @@
 	.exec_dump_fn				= 0
 };
 
-static struct memb_ring_id saved_ring_id;
+static int setProcessorIndex(struct in_addr *proc_addr, 
+								struct ckpt_refcnt *ckpt_refcount) { //RECOVERY_MUNI
+	int i;
+	for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+		
+		// If the source addresses match then this processor index 
+		// has already been set
+		 
+		if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+			return -1;
+		}		
+		
+		// If the source addresses do not match and this element
+		// has no stored value then store the new value and 
+		// return the Index.
+		
+		else if (ckpt_refcount[i].addr.s_addr == 0) {
+			memcpy(&ckpt_refcount[i].addr, proc_addr, sizeof(struct in_addr));			
+			return i;
+		}
+	}
+	// Could not Find an empty slot 
+	// to store the new Processor.	
+	return -1;
+}
+
+static int findProcessorIndex(struct in_addr *proc_addr,
+								struct ckpt_refcnt *ckpt_refcount) { //RECOVERY_MUNI
+	int i;
+	for (i = 0; i < PROCESSOR_COUNT_MAX; i ++) {
+		//If the source addresses match then return the index
+		
+		if (ckpt_refcount[i].addr.s_addr == proc_addr->s_addr) {
+			return i;
+		}				
+	}
+	// Could not Find the Processor 	
+	return -1;
+}
+
+static void initialize_ckpt_refcount_array (struct ckpt_refcnt *ckpt_refcount) {
+	memset((char*)ckpt_refcount, 0, PROCESSOR_COUNT_MAX * sizeof(struct ckpt_refcnt));
+}
+
+static void ckpt_recovery_inititialize () {
+	struct list_head *checkpoint_list;
+	struct saCkptCheckpoint *checkpoint;
+	struct checkpoint_sync *sync_msg;
+	struct saCkptCheckpoint *savedCheckpoint;
+	struct list_head *checkpoint_section_list;
+    struct saCkptCheckpointSection *ckptCheckpointSection;
+	
+
+   	for (checkpoint_list = checkpoint_list_head.next;
+        checkpoint_list != &checkpoint_list_head;
+        checkpoint_list = checkpoint_list->next) {
+
+        checkpoint = list_entry (checkpoint_list,
+            struct saCkptCheckpoint, list);
+
+		//Save off the elements in the new list
+		savedCheckpoint = 
+			(struct saCkptCheckpoint *) malloc (sizeof(struct saCkptCheckpoint));
+		assert(savedCheckpoint);
+		memcpy(savedCheckpoint, checkpoint, sizeof(struct saCkptCheckpoint));
+		list_init(&savedCheckpoint->list);		
+		list_add(&savedCheckpoint->list,&checkpoint_recovery_list_head);	
+		SaUint32T sync_sequence_number = 0;
+		
+		for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next;
+        	checkpoint_section_list != &checkpoint->checkpointSectionsListHead;
+            checkpoint_section_list = checkpoint_section_list->next) {
+
+            ckptCheckpointSection = list_entry (checkpoint_section_list,
+            	struct saCkptCheckpointSection, list);          
+            	
+            SaSizeT origSectionSize = ckptCheckpointSection->sectionDescriptor.sectionSize;  
+            SaSizeT sectionDataSent = 0;          
+            SaSizeT newSectionSize = 0;
+            
+            while (sectionDataSent < origSectionSize) {
+	        	if ((origSectionSize - sectionDataSent) > CKPT_MAX_SECTION_DATA_SEND) { // Send a Max of 400K of section data             	
+	            	newSectionSize = CKPT_MAX_SECTION_DATA_SEND;
+	            }      
+	            else {
+	            	newSectionSize = (origSectionSize - sectionDataSent);
+	            }
+            
+            	//Create and save a new Sync message.
+				sync_msg = 
+					(struct checkpoint_sync *)malloc(sizeof(struct checkpoint_sync));
+				assert(sync_msg);
+				memcpy(&sync_msg->request.checkpointName, &savedCheckpoint->name, sizeof(SaNameT));
+				memcpy(&sync_msg->request.checkpointCreationAttributes, 
+						&savedCheckpoint->checkpointCreationAttributes, 
+						sizeof(SaCkptCheckpointCreationAttributesT));
+				memcpy(&sync_msg->request.sectionDescriptor,
+						&ckptCheckpointSection->sectionDescriptor,
+						sizeof(SaCkptSectionDescriptorT));
+				memcpy((char*)sync_msg->request.sectionData, 
+						((char*)ckptCheckpointSection->sectionData + sectionDataSent),
+						newSectionSize);
+				memcpy (&sync_msg->request.dataOffSet, &sectionDataSent, sizeof(SaUint32T));
+				memcpy (&sync_msg->request.dataSize, &newSectionSize, sizeof(SaUint32T));
+				memcpy(&sync_msg->request.previous_ring_id, &saved_ring_id, sizeof(struct memb_ring_id));
+				memcpy(&sync_msg->request.source_addr, &this_ip.sin_addr, sizeof(struct in_addr));	
+				memcpy(&sync_msg->request.sync_msg_sequence_number, &sync_sequence_number,sizeof(SaUint32T));
+				
+				int proc_index = findProcessorIndex(&this_ip.sin_addr,savedCheckpoint->ckpt_refcount);
+			 
+			 	if (proc_index != -1 ) {	 
+			 		memcpy(&sync_msg->request.ref_count,
+			 				&savedCheckpoint->ckpt_refcount[proc_index].count,
+			 				sizeof(SaUint32T));
+			 	}
+			 	else {
+			 		log_printf (LOG_LEVEL_ERROR, 
+			 			"CKPT: Could not find the checkpoint entry for this processor %p \n", 
+			 			checkpoint);
+			 		continue;
+			 	}
+			 	sync_sequence_number++;
+			 	sectionDataSent += newSectionSize;
+			 	list_init(&sync_msg->list);
+				list_add(&sync_msg->list,&recovery_sync_list_head);      	
+            }            	
+        }
+	}	
+#ifdef TODO
+	int res = totempg_token_callback_create(&tok_call_handle, 
+										TOTEMPG_CALLBACK_TOKEN_SENT,
+										0,
+										ckpt_recovery_process,
+										NULL);
+#endif
+}
+
+static int ckpt_recovery_process (enum totempg_callback_token_type type, void *data ) {
+
+	struct req_exec_ckpt_synchronize_state *request_exec_sync_state;
+	struct iovec iovecs[2];
+	struct list_head *syncList;
+	struct checkpoint_sync *sync_element;
+	
+	//Check for empty list here
+	if (list_empty(&recovery_sync_list_head)) {
+		return (0);
+	}	
+	//Extract the element
+	syncList = recovery_sync_list_head.next;
+	sync_element = list_entry (syncList,
+            struct checkpoint_sync, list);
+            
+    	request_exec_sync_state = &sync_element->request;
+
+	log_printf (LOG_LEVEL_DEBUG, "callback recover_process.\n");
+	request_exec_sync_state->header.size =
+		sizeof (struct req_exec_ckpt_synchronize_state);
+	request_exec_sync_state->header.id = MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE;
+
+	iovecs[0].iov_base = (char *)&request_exec_sync_state;
+	iovecs[0].iov_len = sizeof (struct req_exec_ckpt_synchronize_state);
+
+	assert (totempg_mcast (iovecs, 1, TOTEMPG_AGREED) == 0);	
+	list_del(&sync_element->list);
+	free(sync_element);
+	
+	//Check for empty list here
+	if (list_empty(&recovery_sync_list_head)) {
+		//todo Call finalize here.
+		return (0);
+	}
+#ifdef TODO
+	//We have more to send ...
+	int res = totempg_token_callback_create(&tok_call_handle, 
+										TOTEMPG_CALLBACK_TOKEN_SENT,
+										0,
+										ckpt_recovery_process,
+										NULL);
+#endif
+}
+
+static int ckpt_recovery_finalize () {
+	struct list_head *checkpoint_list;
+        struct saCkptCheckpoint *checkpoint;
+
+	checkpoint_list = checkpoint_list_head.next;	
+	while (checkpoint_list != &checkpoint_list_head) {
+		checkpoint = list_entry (checkpoint_list,
+            			struct saCkptCheckpoint, list);
+		list_del(&checkpoint->list);
+		free(checkpoint);
+		checkpoint_list = checkpoint_list_head.next;
+	}
+	
+	list_init(&checkpoint_list_head);
+	
+	memcpy(&checkpoint_list_head, &checkpoint_recovery_list_head, sizeof(struct list_head));
+
+	list_init(&checkpoint_recovery_list_head);
+	
+	return (0);
+}
+
+static int ckpt_recovery_abort () {
+//@TODO add some code here to abort the recovery process
+	return (0);
+}
+
+static int ckpt_recovery_process_members_exit(struct in_addr *left_list, int left_list_entries) {
+	struct list_head *checkpoint_list;
+	struct saCkptCheckpoint *checkpoint;
+	struct in_addr *member;
+	
+	if (left_list_entries == 0) {
+		return (0);
+	}
+	// Iterate left_list_entries - 1 times and at least once.
+	int i = 0;
+	do {
+		member = left_list;		
+		for (checkpoint_list = checkpoint_list_head.next;
+			checkpoint_list != &checkpoint_list_head;
+			checkpoint_list = checkpoint_list->next) {
+
+			checkpoint = list_entry (checkpoint_list,
+				struct saCkptCheckpoint, list);
+			
+			int index = findProcessorIndex(member, checkpoint->ckpt_refcount);
+			
+			if (index == -1) {
+				continue;
+			}
+			
+			checkpoint->ckpt_refcount[index].count = 0;
+			memset((char*)&checkpoint->ckpt_refcount[index].addr, 0, sizeof(struct in_addr));
+		
+			if (checkpoint->referenceCount > 0) {
+				checkpoint->referenceCount--;
+			}
+		
+		}
+		i++;
+		left_list++;
+	} while (i < (left_list_entries - 1));
+	return(0);
+}
+
+static int ckpt_confchg_fn (
+	enum totempg_configuration_type configuration_type,
+	struct in_addr *member_list, void *member_list_private,
+		int member_list_entries,
+	struct in_addr *left_list, void *left_list_private,
+		int left_list_entries,
+	struct in_addr *joined_list, void *joined_list_private,
+		int joined_list_entries,
+	struct memb_ring_id *ring_id) {
+
+	if (configuration_type == TOTEMPG_CONFIGURATION_REGULAR) {
+#ifdef TODO	
+		totempg_recovery_plug_unplug (ckpt_checkpoint_recovery_plug_handle);	
+#endif
+		memcpy (&saved_ring_id, ring_id, sizeof(struct memb_ring_id));//RECOVERY_MUNI	
+	}	
+
+	else if (configuration_type == TOTEMPG_CONFIGURATION_TRANSITIONAL) {
+		ckpt_recovery_process_members_exit(left_list, left_list_entries);
+		ckpt_recovery_inititialize ();
+	}
+	
+	return (0);
+}
 
 static struct saCkptCheckpoint *ckpt_checkpoint_find_global (SaNameT *name)
 {
-	struct list_head *checkpointList;
+	struct list_head *checkpoint_list;
 	struct saCkptCheckpoint *checkpoint;
 
-   for (checkpointList = checkpointListHead.next;
-        checkpointList != &checkpointListHead;
-        checkpointList = checkpointList->next) {
+   for (checkpoint_list = checkpoint_list_head.next;
+        checkpoint_list != &checkpoint_list_head;
+        checkpoint_list = checkpoint_list->next) {
 
-        checkpoint = list_entry (checkpointList,
+        checkpoint = list_entry (checkpoint_list,
             struct saCkptCheckpoint, list);
 
 		if (name_match (name, &checkpoint->name)) {
@@ -320,15 +610,15 @@
 	char *id,
 	int idLen)
 {
-	struct list_head *checkpointSectionList;
+	struct list_head *checkpoint_section_list;
 	struct saCkptCheckpointSection *ckptCheckpointSection;
 
 	log_printf (LOG_LEVEL_DEBUG, "Finding checkpoint section id %s %d\n", id, idLen);
-	for (checkpointSectionList = ckptCheckpoint->checkpointSectionsListHead.next;
-		checkpointSectionList != &ckptCheckpoint->checkpointSectionsListHead;
-		checkpointSectionList = checkpointSectionList->next) {
+	for (checkpoint_section_list = ckptCheckpoint->checkpointSectionsListHead.next;
+		checkpoint_section_list != &ckptCheckpoint->checkpointSectionsListHead;
+		checkpoint_section_list = checkpoint_section_list->next) {
 
-		ckptCheckpointSection = list_entry (checkpointSectionList,
+		ckptCheckpointSection = list_entry (checkpoint_section_list,
 			struct saCkptCheckpointSection, list);
 	
 		log_printf (LOG_LEVEL_DEBUG, "Checking section id %*s\n", 
@@ -406,7 +696,8 @@
 {
 	// Initialize the saved ring ID.
 	saved_ring_id.seq = 0;
-	saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;	
+	saved_ring_id.rep.s_addr = this_ip.sin_addr.s_addr;		
+	
 #ifdef TODO
 	int res;
 	res = totempg_recovery_plug_create (&ckpt_checkpoint_recovery_plug_handle);
@@ -511,10 +802,11 @@
 		ckptCheckpoint->unlinked = 0;
 		list_init (&ckptCheckpoint->list);
 		list_init (&ckptCheckpoint->checkpointSectionsListHead);
-		list_add (&ckptCheckpoint->list, &checkpointListHead);
+		list_add (&ckptCheckpoint->list, &checkpoint_list_head);
 		ckptCheckpoint->referenceCount = 0;
 		ckptCheckpoint->retention_timer = 0;
 		ckptCheckpoint->expired = 0;
+		initialize_ckpt_refcount_array(ckptCheckpoint->ckpt_refcount);
 
 		/*
 		 * Add in default checkpoint section
@@ -548,7 +840,24 @@
 	 */
 	log_printf (LOG_LEVEL_DEBUG, "CHECKPOINT opened is %p\n", ckptCheckpoint);
 	ckptCheckpoint->referenceCount += 1;
-
+	
+	// Add the connection reference information to the Checkpoint to be
+	// sent out later as a part of the sync process.
+	// RECOVERY_MUNI
+	 
+	 int proc_index = findProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+	 if (proc_index == -1) {//Could not find, lets set the processor to an index.
+	 	proc_index = setProcessorIndex(&source_addr,ckptCheckpoint->ckpt_refcount);
+	 }
+	 if (proc_index != -1 ) {	 
+	 	ckptCheckpoint->ckpt_refcount[proc_index].addr = source_addr;
+	 	ckptCheckpoint->ckpt_refcount[proc_index].count++;
+	 }
+	 else {
+	 	log_printf (LOG_LEVEL_ERROR, 
+	 				"CKPT: MAX LIMIT OF PROCESSORS reached. Cannot store new proc %p info.\n", 
+	 				ckptCheckpoint);
+	 }
 	/*
 	 * Reset retention duration since this checkpoint was just opened
 	 */
@@ -585,6 +894,46 @@
 	return (0);
 }
 
+//RECOVERY_MUNI
+static int message_handler_req_exec_ckpt_synchronize_state (void *message, struct in_addr source_addr, int endian_conversion_required) {
+	int retcode;
+	struct req_exec_ckpt_synchronize_state *req_exec_ckpt_sync_state 
+					= (struct req_exec_ckpt_synchronize_state *)message;
+	if (memcmp (&req_exec_ckpt_sync_state->previous_ring_id, &saved_ring_id,sizeof (struct memb_ring_id)) != 0) {
+			return(0);
+	}
+	// Only do an open/section create for the 1st message for a checkpoint from a given processor.
+	if (req_exec_ckpt_sync_state->sync_msg_sequence_number == 0) {	
+		struct req_exec_ckpt_checkpointopen request_open_exec;	
+		struct req_lib_ckpt_checkpointopen request_open_lib;		
+		
+		request_open_lib.checkpointName = req_exec_ckpt_sync_state->checkpointName;
+		request_open_lib.checkpointCreationAttributes = req_exec_ckpt_sync_state->checkpointCreationAttributes;	
+		request_open_exec.req_lib_ckpt_checkpointopen = request_open_lib;	
+		message_handler_req_exec_ckpt_checkpointopen(&request_open_exec, req_exec_ckpt_sync_state->source_addr, 0);
+		
+		retcode = recovery_section_create (&req_exec_ckpt_sync_state->sectionDescriptor,
+											&req_exec_ckpt_sync_state->checkpointName);
+		if (retcode != SA_AIS_OK) {
+			log_printf(LOG_LEVEL_ERROR, "CKPT: message_handler_req_exec_ckpt_synchronize_state\n");
+			log_printf(LOG_LEVEL_ERROR, "CKPT: recovery_section_create returned %d\n",retcode);
+			return (0);
+		}
+	}	
+	// Write the contents of the section to the checkpoint section.
+	retcode = recovery_section_write(&req_exec_ckpt_sync_state->sectionDescriptor, 
+							&req_exec_ckpt_sync_state->checkpointName,
+							req_exec_ckpt_sync_state->sectionData, 
+							req_exec_ckpt_sync_state->dataOffSet, 
+							req_exec_ckpt_sync_state->dataSize);
+	if (retcode != SA_AIS_OK) {
+		log_printf(LOG_LEVEL_ERROR, "CKPT: message_handler_req_exec_ckpt_synchronize_state\n");
+		log_printf(LOG_LEVEL_ERROR, "CKPT: recovery_section_write returned %d\n",retcode);		
+	}
+	
+	return (0);
+}
+
 unsigned int abstime_to_msec (SaTimeT time)
 {
 	struct timeval tv;
@@ -644,6 +993,19 @@
 	}
 
 	checkpoint->referenceCount--;
+	// Modify the connection reference information to the Checkpoint to be
+	// sent out later as a part of the sync process.
+	// RECOVERY_MUNI
+	
+	int proc_index = findProcessorIndex(&source_addr, checkpoint->ckpt_refcount);
+	if (proc_index != -1 ) {	 		
+	 	checkpoint->ckpt_refcount[proc_index].count--;
+	}
+	else {
+		log_printf (LOG_LEVEL_ERROR, 
+	 				"CKPT: Could Not find Processor Info %p info.\n", 
+	 				checkpoint);
+	}
 	assert (checkpoint->referenceCount >= 0);
 	log_printf (LOG_LEVEL_DEBUG, "disconnect called, new CKPT ref count is %d\n", 
 		checkpoint->referenceCount);
@@ -652,10 +1014,10 @@
 	 * If checkpoint has been unlinked and this is the last reference, delete it
 	 */
 	if (checkpoint->unlinked && checkpoint->referenceCount == 0) {
-		log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");
+		log_printf (LOG_LEVEL_DEBUG, "Unlinking checkpoint.\n");		
 		checkpoint_release (checkpoint);
 	} else
-	if (checkpoint->referenceCount == 0) {
+	if (checkpoint->referenceCount == 0) {		
 		poll_timer_add (aisexec_poll_handle,
 			checkpoint->checkpointCreationAttributes.retentionDuration / 1000000,
 			checkpoint,
@@ -791,6 +1153,86 @@
 	return (0);
 }
 
+static int recovery_section_create (SaCkptSectionDescriptorT *sectionDescriptor,
+									SaNameT *checkpointName) {
+	struct saCkptCheckpoint *ckptCheckpoint;
+	struct saCkptCheckpointSection *ckptCheckpointSection;
+	void *initialData;
+	void *sectionId;
+	SaErrorT error = SA_AIS_OK;		
+	
+	ckptCheckpoint = ckpt_checkpoint_find_global (checkpointName);
+	if (ckptCheckpoint == 0) {		
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	// Determine if user-specified checkpoint ID already exists
+	
+	ckptCheckpointSection = ckpt_checkpoint_find_globalSection (ckptCheckpoint,
+								((char *)sectionDescriptor->sectionId.id),
+								(int)sectionDescriptor->sectionId.idLen);
+	if (ckptCheckpointSection) {
+		error = SA_AIS_ERR_EXIST;
+		goto error_exit;
+	}
+
+	// Allocate checkpoint section	
+	ckptCheckpointSection = malloc (sizeof (struct saCkptCheckpointSection));
+	if (ckptCheckpointSection == 0) {
+		error = SA_AIS_ERR_NO_MEMORY;
+		goto error_exit;
+	}
+	// Allocate checkpoint section data
+	initialData = malloc (sectionDescriptor->sectionSize);
+	if (initialData == 0) {
+		free (ckptCheckpointSection);
+		error = SA_AIS_ERR_NO_MEMORY;
+		goto error_exit;
+	}
+	// Allocate checkpoint section id
+	sectionId = malloc ((int)sectionDescriptor->sectionId.idLen);
+	if (sectionId == 0) {
+		free (ckptCheckpointSection);
+		free (initialData);
+		error = SA_AIS_ERR_NO_MEMORY;
+		goto error_exit;
+	}
+		
+	// Copy checkpoint section ID and initialize data.
+	memcpy (sectionId, ((char *)sectionDescriptor->sectionId.id),
+		(int)sectionDescriptor->sectionId.idLen);
+	
+	memset (initialData, 0, sectionDescriptor->sectionSize);
+	
+	// Configure checkpoint section
+	ckptCheckpointSection->sectionDescriptor.sectionId.id = sectionDescriptor->sectionId.id;
+	ckptCheckpointSection->sectionDescriptor.sectionId.idLen = sectionDescriptor->sectionId.idLen;
+	ckptCheckpointSection->sectionDescriptor.sectionSize = sectionDescriptor->sectionSize;
+	ckptCheckpointSection->sectionDescriptor.expirationTime = sectionDescriptor->expirationTime;
+	ckptCheckpointSection->sectionDescriptor.sectionState = SA_CKPT_SECTION_VALID;
+	ckptCheckpointSection->sectionDescriptor.lastUpdate = sectionDescriptor->lastUpdate;
+	ckptCheckpointSection->sectionData = initialData;
+	ckptCheckpointSection->expiration_timer = 0;
+
+	if (sectionDescriptor->expirationTime != SA_TIME_END) {
+		poll_timer_add (aisexec_poll_handle,
+			abstime_to_msec (ckptCheckpointSection->sectionDescriptor.expirationTime),
+			ckptCheckpointSection,
+			timer_function_section_expire,
+			&ckptCheckpointSection->expiration_timer);
+	}
+
+	// Add checkpoint section to checkpoint
+	list_init (&ckptCheckpointSection->list);
+	list_add (&ckptCheckpointSection->list,
+		&ckptCheckpoint->checkpointSectionsListHead);
+
+error_exit:
+	return (error);				
+
+}
+
 static int message_handler_req_exec_ckpt_sectioncreate (void *message, struct in_addr source_addr, int endian_conversion_required) {
 	struct req_exec_ckpt_sectioncreate *req_exec_ckpt_sectioncreate = (struct req_exec_ckpt_sectioncreate *)message;
 	struct req_lib_ckpt_sectioncreate *req_lib_ckpt_sectioncreate = (struct req_lib_ckpt_sectioncreate *)&req_exec_ckpt_sectioncreate->req_lib_ckpt_sectioncreate;
@@ -1015,6 +1457,62 @@
 	return (0);
 }
 
+static int recovery_section_write(SaCkptSectionDescriptorT *sectionDescriptor,
+									SaNameT *checkpointName,
+									void *newData,
+									SaUint32T dataOffSet,
+									SaUint32T dataSize) {
+	struct saCkptCheckpoint *ckptCheckpoint;
+	struct saCkptCheckpointSection *ckptCheckpointSection;
+	int sizeRequired;
+	void *sectionData;
+	SaErrorT error = SA_AIS_OK;
+
+	log_printf (LOG_LEVEL_DEBUG, "CKPT: recovery_section_write.\n");
+	ckptCheckpoint = ckpt_checkpoint_find_global (checkpointName);
+	if (ckptCheckpoint == 0) {
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	// Find checkpoint section to be written
+	ckptCheckpointSection = ckpt_checkpoint_find_globalSection (ckptCheckpoint,
+								((char *)sectionDescriptor->sectionId.id),
+								(int)sectionDescriptor->sectionId.idLen);
+	if (ckptCheckpointSection == 0) {		
+		error = SA_AIS_ERR_NOT_EXIST;
+		goto error_exit;
+	}
+
+	// If write would extend past end of section data, enlarge section
+	sizeRequired = dataOffSet + dataSize;
+	if (sizeRequired > ckptCheckpointSection->sectionDescriptor.sectionSize) {
+		sectionData = realloc (ckptCheckpointSection->sectionData, sizeRequired);
+		if (sectionData == 0) {
+			error = SA_AIS_ERR_NO_MEMORY;
+			goto error_exit;
+		}
+
+		// Install new section data
+		ckptCheckpointSection->sectionData = sectionData;
+		ckptCheckpointSection->sectionDescriptor.sectionSize = sizeRequired;
+	}
+
+	// Write checkpoint section to section data
+	if (dataSize > 0) {
+		char *sd;
+		int *val;
+		val = ckptCheckpointSection->sectionData;
+		sd = (char *)ckptCheckpointSection->sectionData;
+		memcpy (&sd[dataOffSet],
+			newData,
+			dataSize);
+	}	
+error_exit:	
+	return (error);	
+}
+
+
 static int message_handler_req_exec_ckpt_sectionwrite (void *message, struct in_addr source_addr, int endian_conversion_required) {
 	struct req_exec_ckpt_sectionwrite *req_exec_ckpt_sectionwrite = (struct req_exec_ckpt_sectionwrite *)message;
 	struct req_lib_ckpt_sectionwrite *req_lib_ckpt_sectionwrite = (struct req_lib_ckpt_sectionwrite *)&req_exec_ckpt_sectionwrite->req_lib_ckpt_sectionwrite;
@@ -1258,7 +1756,7 @@
 		conn_info->ais_ci.u.libckpt_ci.sectionIterator.iteratorCount = 0;
 		conn_info->ais_ci.u.libckpt_ci.sectionIterator.iteratorPos = 0;
 		list_add (&conn_info->ais_ci.u.libckpt_ci.sectionIterator.list,
-			&checkpointIteratorListHead);
+			&checkpoint_iterator_list_head);
 		list_init (&conn_info->ais_ci.u.libckpt_ci.checkpoint_list);
 		error = SA_AIS_OK;
 	}
@@ -1400,7 +1898,7 @@
 	struct saCkptCheckpoint *checkpoint;
 	int memoryUsed = 0;
 	int numberOfSections = 0;
-	struct list_head *checkpointSectionList;
+	struct list_head *checkpoint_section_list;
 	struct saCkptCheckpointSection *checkpointSection;
 
 	log_printf (LOG_LEVEL_DEBUG, "in status get\n");
@@ -1410,11 +1908,11 @@
 	 */
 	checkpoint = ckpt_checkpoint_find_global (&req_lib_ckpt_checkpointstatusget->checkpointName);
 
-	for (checkpointSectionList = checkpoint->checkpointSectionsListHead.next;
-		checkpointSectionList != &checkpoint->checkpointSectionsListHead;
-		checkpointSectionList = checkpointSectionList->next) {
+	for (checkpoint_section_list = checkpoint->checkpointSectionsListHead.next;
+		checkpoint_section_list != &checkpoint->checkpointSectionsListHead;
+		checkpoint_section_list = checkpoint_section_list->next) {
 
-		checkpointSection = list_entry (checkpointSectionList,
+		checkpointSection = list_entry (checkpoint_section_list,
 			struct saCkptCheckpointSection, list);
 
 		memoryUsed += checkpointSection->sectionDescriptor.sectionSize;
@@ -1730,7 +2228,7 @@
 	struct saCkptCheckpointSection *ckptCheckpointSection;
 	struct saCkptSectionIteratorEntry *ckptSectionIteratorEntries;
 	struct saCkptSectionIterator *ckptSectionIterator;
-	struct list_head *checkpointSectionList;
+	struct list_head *checkpoint_section_list;
 	int addEntry = 0;
 	int iteratorEntries = 0;
 	SaErrorT error = SA_AIS_OK;
@@ -1747,11 +2245,11 @@
 	/*
 	 * Iterate list of checkpoint sections
 	 */
-	for (checkpointSectionList = ckptCheckpoint->checkpointSectionsListHead.next;
-		checkpointSectionList != &ckptCheckpoint->checkpointSectionsListHead;
-		checkpointSectionList = checkpointSectionList->next) {
+	for (checkpoint_section_list = ckptCheckpoint->checkpointSectionsListHead.next;
+		checkpoint_section_list != &ckptCheckpoint->checkpointSectionsListHead;
+		checkpoint_section_list = checkpoint_section_list->next) {
 
-		ckptCheckpointSection = list_entry (checkpointSectionList,
+		ckptCheckpointSection = list_entry (checkpoint_section_list,
 			struct saCkptCheckpointSection, list);
 
 		addEntry = 1;
--- ../latest/include/ipc_ckpt.h	2005-02-23 12:56:16.000000000 -0600
+++ ../bk_openais/include/ipc_ckpt.h	2005-02-23 10:56:31.000000000 -0600
@@ -37,6 +37,8 @@
 #include "../include/ipc_gen.h"
 #include "../include/ais_types.h"
 #include "../include/saCkpt.h"
+#include "../exec/totemsrp.h"
+#include "../exec/ckpt.h"
 
 enum req_lib_ckpt_checkpoint_types {
 	MESSAGE_REQ_CKPT_CHECKPOINT_CHECKPOINTOPEN = 1,
@@ -326,4 +328,18 @@
 	struct res_header header;
 };
 
+struct req_exec_ckpt_synchronize_state {
+	struct req_header header;
+	struct memb_ring_id previous_ring_id;
+	SaNameT checkpointName;
+	SaCkptCheckpointCreationAttributesT checkpointCreationAttributes;
+	SaCkptSectionDescriptorT sectionDescriptor;
+	void *sectionData;
+	SaUint32T dataOffSet;
+	SaUint32T dataSize;
+	struct in_addr source_addr;
+	SaUint32T ref_count;
+	SaUint32T sync_msg_sequence_number;
+};
+
 #endif /* IPC_CKPT_H_DEFINED */
--- ../latest/include/ipc_gen.h	2005-02-23 12:56:16.000000000 -0600
+++ ../bk_openais/include/ipc_gen.h	2005-02-17 13:59:10.000000000 -0600
@@ -69,6 +69,7 @@
 	MESSAGE_REQ_EXEC_CKPT_SECTIONWRITE,
 	MESSAGE_REQ_EXEC_CKPT_SECTIONOVERWRITE,
 	MESSAGE_REQ_EXEC_CKPT_SECTIONREAD,
+	MESSAGE_REQ_EXEC_CKPT_SYNCHRONIZESTATE,
 	MESSAGE_REQ_EXEC_EVT_EVENTDATA,
 	MESSAGE_REQ_EXEC_EVT_CHANCMD,
 	MESSAGE_REQ_EXEC_EVT_RECOVERY_EVENTDATA


More information about the Openais mailing list