diff options
author | Tony Luck <tony.luck@intel.com> | 2005-10-28 15:27:03 -0700 |
---|---|---|
committer | Tony Luck <tony.luck@intel.com> | 2005-10-28 15:27:03 -0700 |
commit | fac84ef26759a3725bfc53ae3abf21976360aff3 (patch) | |
tree | 94aa9362e72a0736948adc78cb43ebf44f59580b | |
parent | d73dee6ee4b554074f88e4ebd956ea4db8552d52 (diff) | |
parent | 279290294662d4a35d209fb7d7c46523cfa3d63d (diff) | |
download | op-kernel-dev-fac84ef26759a3725bfc53ae3abf21976360aff3.zip op-kernel-dev-fac84ef26759a3725bfc53ae3abf21976360aff3.tar.gz |
Pull xpc-disengage into release branch
-rw-r--r-- | arch/ia64/sn/kernel/xpc.h | 366 | ||||
-rw-r--r-- | arch/ia64/sn/kernel/xpc_channel.c | 329 | ||||
-rw-r--r-- | arch/ia64/sn/kernel/xpc_main.c | 330 | ||||
-rw-r--r-- | arch/ia64/sn/kernel/xpc_partition.c | 473 | ||||
-rw-r--r-- | include/asm-ia64/sn/xp.h | 14 |
5 files changed, 1142 insertions, 370 deletions
diff --git a/arch/ia64/sn/kernel/xpc.h b/arch/ia64/sn/kernel/xpc.h index d0ee635..33df1b3 100644 --- a/arch/ia64/sn/kernel/xpc.h +++ b/arch/ia64/sn/kernel/xpc.h @@ -57,7 +57,7 @@ #define XPC_NASID_FROM_W_B(_w, _b) (((_w) * 64 + (_b)) * 2) #define XPC_HB_DEFAULT_INTERVAL 5 /* incr HB every x secs */ -#define XPC_HB_CHECK_DEFAULT_TIMEOUT 20 /* check HB every x secs */ +#define XPC_HB_CHECK_DEFAULT_INTERVAL 20 /* check HB every x secs */ /* define the process name of HB checker and the CPU it is pinned to */ #define XPC_HB_CHECK_THREAD_NAME "xpc_hb" @@ -67,34 +67,82 @@ #define XPC_DISCOVERY_THREAD_NAME "xpc_discovery" -#define XPC_HB_ALLOWED(_p, _v) ((_v)->heartbeating_to_mask & (1UL << (_p))) -#define XPC_ALLOW_HB(_p, _v) (_v)->heartbeating_to_mask |= (1UL << (_p)) -#define XPC_DISALLOW_HB(_p, _v) (_v)->heartbeating_to_mask &= (~(1UL << (_p))) - - /* - * Reserved Page provided by SAL. + * the reserved page + * + * SAL reserves one page of memory per partition for XPC. Though a full page + * in length (16384 bytes), its starting address is not page aligned, but it + * is cacheline aligned. The reserved page consists of the following: + * + * reserved page header + * + * The first cacheline of the reserved page contains the header + * (struct xpc_rsvd_page). Before SAL initialization has completed, + * SAL has set up the following fields of the reserved page header: + * SAL_signature, SAL_version, partid, and nasids_size. The other + * fields are set up by XPC. (xpc_rsvd_page points to the local + * partition's reserved page.) * - * SAL provides one page per partition of reserved memory. When SAL - * initialization is complete, SAL_signature, SAL_version, partid, - * part_nasids, and mach_nasids are set. + * part_nasids mask + * mach_nasids mask + * + * SAL also sets up two bitmaps (or masks), one that reflects the actual + * nasids in this partition (part_nasids), and the other that reflects + * the actual nasids in the entire machine (mach_nasids). We're only + * interested in the even numbered nasids (which contain the processors + * and/or memory), so we only need half as many bits to represent the + * nasids. The part_nasids mask is located starting at the first cacheline + * following the reserved page header. The mach_nasids mask follows right + * after the part_nasids mask. The size in bytes of each mask is reflected + * by the reserved page header field 'nasids_size'. (Local partition's + * mask pointers are xpc_part_nasids and xpc_mach_nasids.) + * + * vars + * vars part + * + * Immediately following the mach_nasids mask are the XPC variables + * required by other partitions. First are those that are generic to all + * partitions (vars), followed on the next available cacheline by those + * which are partition specific (vars part). These are setup by XPC. + * (Local partition's vars pointers are xpc_vars and xpc_vars_part.) * * Note: Until vars_pa is set, the partition XPC code has not been initialized. */ struct xpc_rsvd_page { - u64 SAL_signature; /* SAL unique signature */ - u64 SAL_version; /* SAL specified version */ - u8 partid; /* partition ID from SAL */ + u64 SAL_signature; /* SAL: unique signature */ + u64 SAL_version; /* SAL: version */ + u8 partid; /* SAL: partition ID */ u8 version; - u8 pad[6]; /* pad to u64 align */ + u8 pad1[6]; /* align to next u64 in cacheline */ volatile u64 vars_pa; - u64 part_nasids[XP_NASID_MASK_WORDS] ____cacheline_aligned; - u64 mach_nasids[XP_NASID_MASK_WORDS] ____cacheline_aligned; + struct timespec stamp; /* time when reserved page was setup by XPC */ + u64 pad2[9]; /* align to last u64 in cacheline */ + u64 nasids_size; /* SAL: size of each nasid mask in bytes */ }; -#define XPC_RP_VERSION _XPC_VERSION(1,0) /* version 1.0 of the reserved page */ -#define XPC_RSVD_PAGE_ALIGNED_SIZE \ - (L1_CACHE_ALIGN(sizeof(struct xpc_rsvd_page))) +#define XPC_RP_VERSION _XPC_VERSION(1,1) /* version 1.1 of the reserved page */ + +#define XPC_SUPPORTS_RP_STAMP(_version) \ + (_version >= _XPC_VERSION(1,1)) + +/* + * compare stamps - the return value is: + * + * < 0, if stamp1 < stamp2 + * = 0, if stamp1 == stamp2 + * > 0, if stamp1 > stamp2 + */ +static inline int +xpc_compare_stamps(struct timespec *stamp1, struct timespec *stamp2) +{ + int ret; + + + if ((ret = stamp1->tv_sec - stamp2->tv_sec) == 0) { + ret = stamp1->tv_nsec - stamp2->tv_nsec; + } + return ret; +} /* @@ -121,11 +169,58 @@ struct xpc_vars { u64 vars_part_pa; u64 amos_page_pa; /* paddr of page of AMOs from MSPEC driver */ AMO_t *amos_page; /* vaddr of page of AMOs from MSPEC driver */ - AMO_t *act_amos; /* pointer to the first activation AMO */ }; -#define XPC_V_VERSION _XPC_VERSION(3,0) /* version 3.0 of the cross vars */ -#define XPC_VARS_ALIGNED_SIZE (L1_CACHE_ALIGN(sizeof(struct xpc_vars))) +#define XPC_V_VERSION _XPC_VERSION(3,1) /* version 3.1 of the cross vars */ + +#define XPC_SUPPORTS_DISENGAGE_REQUEST(_version) \ + (_version >= _XPC_VERSION(3,1)) + + +static inline int +xpc_hb_allowed(partid_t partid, struct xpc_vars *vars) +{ + return ((vars->heartbeating_to_mask & (1UL << partid)) != 0); +} + +static inline void +xpc_allow_hb(partid_t partid, struct xpc_vars *vars) +{ + u64 old_mask, new_mask; + + do { + old_mask = vars->heartbeating_to_mask; + new_mask = (old_mask | (1UL << partid)); + } while (cmpxchg(&vars->heartbeating_to_mask, old_mask, new_mask) != + old_mask); +} + +static inline void +xpc_disallow_hb(partid_t partid, struct xpc_vars *vars) +{ + u64 old_mask, new_mask; + + do { + old_mask = vars->heartbeating_to_mask; + new_mask = (old_mask & ~(1UL << partid)); + } while (cmpxchg(&vars->heartbeating_to_mask, old_mask, new_mask) != + old_mask); +} + + +/* + * The AMOs page consists of a number of AMO variables which are divided into + * four groups, The first two groups are used to identify an IRQ's sender. + * These two groups consist of 64 and 128 AMO variables respectively. The last + * two groups, consisting of just one AMO variable each, are used to identify + * the remote partitions that are currently engaged (from the viewpoint of + * the XPC running on the remote partition). + */ +#define XPC_NOTIFY_IRQ_AMOS 0 +#define XPC_ACTIVATE_IRQ_AMOS (XPC_NOTIFY_IRQ_AMOS + XP_MAX_PARTITIONS) +#define XPC_ENGAGED_PARTITIONS_AMO (XPC_ACTIVATE_IRQ_AMOS + XP_NASID_MASK_WORDS) +#define XPC_DISENGAGE_REQUEST_AMO (XPC_ENGAGED_PARTITIONS_AMO + 1) + /* * The following structure describes the per partition specific variables. @@ -165,6 +260,16 @@ struct xpc_vars_part { #define XPC_VP_MAGIC2 0x0073726176435058L /* 'XPCvars\0'L (little endian) */ +/* the reserved page sizes and offsets */ + +#define XPC_RP_HEADER_SIZE L1_CACHE_ALIGN(sizeof(struct xpc_rsvd_page)) +#define XPC_RP_VARS_SIZE L1_CACHE_ALIGN(sizeof(struct xpc_vars)) + +#define XPC_RP_PART_NASIDS(_rp) (u64 *) ((u8 *) _rp + XPC_RP_HEADER_SIZE) +#define XPC_RP_MACH_NASIDS(_rp) (XPC_RP_PART_NASIDS(_rp) + xp_nasid_mask_words) +#define XPC_RP_VARS(_rp) ((struct xpc_vars *) XPC_RP_MACH_NASIDS(_rp) + xp_nasid_mask_words) +#define XPC_RP_VARS_PART(_rp) (struct xpc_vars_part *) ((u8 *) XPC_RP_VARS(rp) + XPC_RP_VARS_SIZE) + /* * Functions registered by add_timer() or called by kernel_thread() only @@ -349,6 +454,9 @@ struct xpc_channel { atomic_t n_on_msg_allocate_wq; /* #on msg allocation wait queue */ wait_queue_head_t msg_allocate_wq; /* msg allocation wait queue */ + u8 delayed_IPI_flags; /* IPI flags received, but delayed */ + /* action until channel disconnected */ + /* queue of msg senders who want to be notified when msg received */ atomic_t n_to_notify; /* #of msg senders to notify */ @@ -358,7 +466,7 @@ struct xpc_channel { void *key; /* pointer to user's key */ struct semaphore msg_to_pull_sema; /* next msg to pull serialization */ - struct semaphore teardown_sema; /* wait for teardown completion */ + struct semaphore wdisconnect_sema; /* wait for channel disconnect */ struct xpc_openclose_args *local_openclose_args; /* args passed on */ /* opening or closing of channel */ @@ -410,6 +518,8 @@ struct xpc_channel { #define XPC_C_DISCONNECTED 0x00002000 /* channel is disconnected */ #define XPC_C_DISCONNECTING 0x00004000 /* channel is being disconnected */ +#define XPC_C_DISCONNECTCALLOUT 0x00008000 /* chan disconnected callout made */ +#define XPC_C_WDISCONNECT 0x00010000 /* waiting for channel disconnect */ @@ -422,6 +532,8 @@ struct xpc_partition { /* XPC HB infrastructure */ + u8 remote_rp_version; /* version# of partition's rsvd pg */ + struct timespec remote_rp_stamp;/* time when rsvd pg was initialized */ u64 remote_rp_pa; /* phys addr of partition's rsvd pg */ u64 remote_vars_pa; /* phys addr of partition's vars */ u64 remote_vars_part_pa; /* phys addr of partition's vars part */ @@ -432,14 +544,18 @@ struct xpc_partition { u32 act_IRQ_rcvd; /* IRQs since activation */ spinlock_t act_lock; /* protect updating of act_state */ u8 act_state; /* from XPC HB viewpoint */ + u8 remote_vars_version; /* version# of partition's vars */ enum xpc_retval reason; /* reason partition is deactivating */ int reason_line; /* line# deactivation initiated from */ int reactivate_nasid; /* nasid in partition to reactivate */ + unsigned long disengage_request_timeout; /* timeout in jiffies */ + struct timer_list disengage_request_timer; + /* XPC infrastructure referencing and teardown control */ - volatile u8 setup_state; /* infrastructure setup state */ + volatile u8 setup_state; /* infrastructure setup state */ wait_queue_head_t teardown_wq; /* kthread waiting to teardown infra */ atomic_t references; /* #of references to infrastructure */ @@ -454,6 +570,7 @@ struct xpc_partition { u8 nchannels; /* #of defined channels supported */ atomic_t nchannels_active; /* #of channels that are not DISCONNECTED */ + atomic_t nchannels_engaged;/* #of channels engaged with remote part */ struct xpc_channel *channels;/* array of channel structures */ void *local_GPs_base; /* base address of kmalloc'd space */ @@ -518,6 +635,7 @@ struct xpc_partition { #define XPC_P_TORNDOWN 0x03 /* infrastructure is torndown */ + /* * struct xpc_partition IPI_timer #of seconds to wait before checking for * dropped IPIs. These occur whenever an IPI amo write doesn't complete until @@ -526,6 +644,13 @@ struct xpc_partition { #define XPC_P_DROPPED_IPI_WAIT (0.25 * HZ) +/* number of seconds to wait for other partitions to disengage */ +#define XPC_DISENGAGE_REQUEST_DEFAULT_TIMELIMIT 90 + +/* interval in seconds to print 'waiting disengagement' messages */ +#define XPC_DISENGAGE_PRINTMSG_INTERVAL 10 + + #define XPC_PARTID(_p) ((partid_t) ((_p) - &xpc_partitions[0])) @@ -534,24 +659,20 @@ struct xpc_partition { extern struct xpc_registration xpc_registrations[]; -/* >>> found in xpc_main.c only */ +/* found in xpc_main.c */ extern struct device *xpc_part; extern struct device *xpc_chan; +extern int xpc_disengage_request_timelimit; extern irqreturn_t xpc_notify_IRQ_handler(int, void *, struct pt_regs *); extern void xpc_dropped_IPI_check(struct xpc_partition *); +extern void xpc_activate_partition(struct xpc_partition *); extern void xpc_activate_kthreads(struct xpc_channel *, int); extern void xpc_create_kthreads(struct xpc_channel *, int); extern void xpc_disconnect_wait(int); -/* found in xpc_main.c and efi-xpc.c */ -extern void xpc_activate_partition(struct xpc_partition *); - - /* found in xpc_partition.c */ extern int xpc_exiting; -extern int xpc_hb_interval; -extern int xpc_hb_check_interval; extern struct xpc_vars *xpc_vars; extern struct xpc_rsvd_page *xpc_rsvd_page; extern struct xpc_vars_part *xpc_vars_part; @@ -561,6 +682,7 @@ extern struct xpc_rsvd_page *xpc_rsvd_page_init(void); extern void xpc_allow_IPI_ops(void); extern void xpc_restrict_IPI_ops(void); extern int xpc_identify_act_IRQ_sender(void); +extern int xpc_partition_disengaged(struct xpc_partition *); extern enum xpc_retval xpc_mark_partition_active(struct xpc_partition *); extern void xpc_mark_partition_inactive(struct xpc_partition *); extern void xpc_discovery(void); @@ -585,8 +707,8 @@ extern void xpc_connected_callout(struct xpc_channel *); extern void xpc_deliver_msg(struct xpc_channel *); extern void xpc_disconnect_channel(const int, struct xpc_channel *, enum xpc_retval, unsigned long *); -extern void xpc_disconnected_callout(struct xpc_channel *); -extern void xpc_partition_down(struct xpc_partition *, enum xpc_retval); +extern void xpc_disconnecting_callout(struct xpc_channel *); +extern void xpc_partition_going_down(struct xpc_partition *, enum xpc_retval); extern void xpc_teardown_infrastructure(struct xpc_partition *); @@ -674,6 +796,157 @@ xpc_part_ref(struct xpc_partition *part) /* + * This next set of inlines are used to keep track of when a partition is + * potentially engaged in accessing memory belonging to another partition. + */ + +static inline void +xpc_mark_partition_engaged(struct xpc_partition *part) +{ + unsigned long irq_flags; + AMO_t *amo = (AMO_t *) __va(part->remote_amos_page_pa + + (XPC_ENGAGED_PARTITIONS_AMO * sizeof(AMO_t))); + + + local_irq_save(irq_flags); + + /* set bit corresponding to our partid in remote partition's AMO */ + FETCHOP_STORE_OP(TO_AMO((u64) &amo->variable), FETCHOP_OR, + (1UL << sn_partition_id)); + /* + * We must always use the nofault function regardless of whether we + * are on a Shub 1.1 system or a Shub 1.2 slice 0xc processor. If we + * didn't, we'd never know that the other partition is down and would + * keep sending IPIs and AMOs to it until the heartbeat times out. + */ + (void) xp_nofault_PIOR((u64 *) GLOBAL_MMR_ADDR(NASID_GET(&amo-> + variable), xp_nofault_PIOR_target)); + + local_irq_restore(irq_flags); +} + +static inline void +xpc_mark_partition_disengaged(struct xpc_partition *part) +{ + unsigned long irq_flags; + AMO_t *amo = (AMO_t *) __va(part->remote_amos_page_pa + + (XPC_ENGAGED_PARTITIONS_AMO * sizeof(AMO_t))); + + + local_irq_save(irq_flags); + + /* clear bit corresponding to our partid in remote partition's AMO */ + FETCHOP_STORE_OP(TO_AMO((u64) &amo->variable), FETCHOP_AND, + ~(1UL << sn_partition_id)); + /* + * We must always use the nofault function regardless of whether we + * are on a Shub 1.1 system or a Shub 1.2 slice 0xc processor. If we + * didn't, we'd never know that the other partition is down and would + * keep sending IPIs and AMOs to it until the heartbeat times out. + */ + (void) xp_nofault_PIOR((u64 *) GLOBAL_MMR_ADDR(NASID_GET(&amo-> + variable), xp_nofault_PIOR_target)); + + local_irq_restore(irq_flags); +} + +static inline void +xpc_request_partition_disengage(struct xpc_partition *part) +{ + unsigned long irq_flags; + AMO_t *amo = (AMO_t *) __va(part->remote_amos_page_pa + + (XPC_DISENGAGE_REQUEST_AMO * sizeof(AMO_t))); + + + local_irq_save(irq_flags); + + /* set bit corresponding to our partid in remote partition's AMO */ + FETCHOP_STORE_OP(TO_AMO((u64) &amo->variable), FETCHOP_OR, + (1UL << sn_partition_id)); + /* + * We must always use the nofault function regardless of whether we + * are on a Shub 1.1 system or a Shub 1.2 slice 0xc processor. If we + * didn't, we'd never know that the other partition is down and would + * keep sending IPIs and AMOs to it until the heartbeat times out. + */ + (void) xp_nofault_PIOR((u64 *) GLOBAL_MMR_ADDR(NASID_GET(&amo-> + variable), xp_nofault_PIOR_target)); + + local_irq_restore(irq_flags); +} + +static inline void +xpc_cancel_partition_disengage_request(struct xpc_partition *part) +{ + unsigned long irq_flags; + AMO_t *amo = (AMO_t *) __va(part->remote_amos_page_pa + + (XPC_DISENGAGE_REQUEST_AMO * sizeof(AMO_t))); + + + local_irq_save(irq_flags); + + /* clear bit corresponding to our partid in remote partition's AMO */ + FETCHOP_STORE_OP(TO_AMO((u64) &amo->variable), FETCHOP_AND, + ~(1UL << sn_partition_id)); + /* + * We must always use the nofault function regardless of whether we + * are on a Shub 1.1 system or a Shub 1.2 slice 0xc processor. If we + * didn't, we'd never know that the other partition is down and would + * keep sending IPIs and AMOs to it until the heartbeat times out. + */ + (void) xp_nofault_PIOR((u64 *) GLOBAL_MMR_ADDR(NASID_GET(&amo-> + variable), xp_nofault_PIOR_target)); + + local_irq_restore(irq_flags); +} + +static inline u64 +xpc_partition_engaged(u64 partid_mask) +{ + AMO_t *amo = xpc_vars->amos_page + XPC_ENGAGED_PARTITIONS_AMO; + + + /* return our partition's AMO variable ANDed with partid_mask */ + return (FETCHOP_LOAD_OP(TO_AMO((u64) &amo->variable), FETCHOP_LOAD) & + partid_mask); +} + +static inline u64 +xpc_partition_disengage_requested(u64 partid_mask) +{ + AMO_t *amo = xpc_vars->amos_page + XPC_DISENGAGE_REQUEST_AMO; + + + /* return our partition's AMO variable ANDed with partid_mask */ + return (FETCHOP_LOAD_OP(TO_AMO((u64) &amo->variable), FETCHOP_LOAD) & + partid_mask); +} + +static inline void +xpc_clear_partition_engaged(u64 partid_mask) +{ + AMO_t *amo = xpc_vars->amos_page + XPC_ENGAGED_PARTITIONS_AMO; + + + /* clear bit(s) based on partid_mask in our partition's AMO */ + FETCHOP_STORE_OP(TO_AMO((u64) &amo->variable), FETCHOP_AND, + ~partid_mask); +} + +static inline void +xpc_clear_partition_disengage_request(u64 partid_mask) +{ + AMO_t *amo = xpc_vars->amos_page + XPC_DISENGAGE_REQUEST_AMO; + + + /* clear bit(s) based on partid_mask in our partition's AMO */ + FETCHOP_STORE_OP(TO_AMO((u64) &amo->variable), FETCHOP_AND, + ~partid_mask); +} + + + +/* * The following set of macros and inlines are used for the sending and * receiving of IPIs (also known as IRQs). There are two flavors of IPIs, * one that is associated with partition activity (SGI_XPC_ACTIVATE) and @@ -722,13 +995,13 @@ xpc_IPI_send(AMO_t *amo, u64 flag, int nasid, int phys_cpuid, int vector) * Flag the appropriate AMO variable and send an IPI to the specified node. */ static inline void -xpc_activate_IRQ_send(u64 amos_page, int from_nasid, int to_nasid, +xpc_activate_IRQ_send(u64 amos_page_pa, int from_nasid, int to_nasid, int to_phys_cpuid) { int w_index = XPC_NASID_W_INDEX(from_nasid); int b_index = XPC_NASID_B_INDEX(from_nasid); - AMO_t *amos = (AMO_t *) __va(amos_page + - (XP_MAX_PARTITIONS * sizeof(AMO_t))); + AMO_t *amos = (AMO_t *) __va(amos_page_pa + + (XPC_ACTIVATE_IRQ_AMOS * sizeof(AMO_t))); (void) xpc_IPI_send(&amos[w_index], (1UL << b_index), to_nasid, @@ -756,6 +1029,13 @@ xpc_IPI_send_reactivate(struct xpc_partition *part) xpc_vars->act_nasid, xpc_vars->act_phys_cpuid); } +static inline void +xpc_IPI_send_disengage(struct xpc_partition *part) +{ + xpc_activate_IRQ_send(part->remote_amos_page_pa, cnodeid_to_nasid(0), + part->remote_act_nasid, part->remote_act_phys_cpuid); +} + /* * IPIs associated with SGI_XPC_NOTIFY IRQ. @@ -836,6 +1116,7 @@ xpc_notify_IRQ_send_local(struct xpc_channel *ch, u8 ipi_flag, /* given an AMO variable and a channel#, get its associated IPI flags */ #define XPC_GET_IPI_FLAGS(_amo, _c) ((u8) (((_amo) >> ((_c) * 8)) & 0xff)) +#define XPC_SET_IPI_FLAGS(_amo, _c, _f) (_amo) |= ((u64) (_f) << ((_c) * 8)) #define XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(_amo) ((_amo) & 0x0f0f0f0f0f0f0f0f) #define XPC_ANY_MSG_IPI_FLAGS_SET(_amo) ((_amo) & 0x1010101010101010) @@ -903,17 +1184,18 @@ xpc_IPI_send_local_msgrequest(struct xpc_channel *ch) * cacheable mapping for the entire region. This will prevent speculative * reading of cached copies of our lines from being issued which will cause * a PI FSB Protocol error to be generated by the SHUB. For XPC, we need 64 - * (XP_MAX_PARTITIONS) AMO variables for message notification (xpc_main.c) - * and an additional 16 AMO variables for partition activation (xpc_hb.c). + * AMO variables (based on XP_MAX_PARTITIONS) for message notification and an + * additional 128 AMO variables (based on XP_NASID_MASK_WORDS) for partition + * activation and 2 AMO variables for partition deactivation. */ static inline AMO_t * -xpc_IPI_init(partid_t partid) +xpc_IPI_init(int index) { - AMO_t *part_amo = xpc_vars->amos_page + partid; + AMO_t *amo = xpc_vars->amos_page + index; - xpc_IPI_receive(part_amo); - return part_amo; + (void) xpc_IPI_receive(amo); /* clear AMO variable */ + return amo; } diff --git a/arch/ia64/sn/kernel/xpc_channel.c b/arch/ia64/sn/kernel/xpc_channel.c index 94698be..abf4fc2 100644 --- a/arch/ia64/sn/kernel/xpc_channel.c +++ b/arch/ia64/sn/kernel/xpc_channel.c @@ -57,6 +57,7 @@ xpc_initialize_channels(struct xpc_partition *part, partid_t partid) spin_lock_init(&ch->lock); sema_init(&ch->msg_to_pull_sema, 1); /* mutex */ + sema_init(&ch->wdisconnect_sema, 0); /* event wait */ atomic_set(&ch->n_on_msg_allocate_wq, 0); init_waitqueue_head(&ch->msg_allocate_wq); @@ -166,6 +167,7 @@ xpc_setup_infrastructure(struct xpc_partition *part) xpc_initialize_channels(part, partid); atomic_set(&part->nchannels_active, 0); + atomic_set(&part->nchannels_engaged, 0); /* local_IPI_amo were set to 0 by an earlier memset() */ @@ -555,8 +557,6 @@ xpc_allocate_msgqueues(struct xpc_channel *ch) sema_init(&ch->notify_queue[i].sema, 0); } - sema_init(&ch->teardown_sema, 0); /* event wait */ - spin_lock_irqsave(&ch->lock, irq_flags); ch->flags |= XPC_C_SETUP; spin_unlock_irqrestore(&ch->lock, irq_flags); @@ -626,6 +626,55 @@ xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags) /* + * Notify those who wanted to be notified upon delivery of their message. + */ +static void +xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put) +{ + struct xpc_notify *notify; + u8 notify_type; + s64 get = ch->w_remote_GP.get - 1; + + + while (++get < put && atomic_read(&ch->n_to_notify) > 0) { + + notify = &ch->notify_queue[get % ch->local_nentries]; + + /* + * See if the notify entry indicates it was associated with + * a message who's sender wants to be notified. It is possible + * that it is, but someone else is doing or has done the + * notification. + */ + notify_type = notify->type; + if (notify_type == 0 || + cmpxchg(¬ify->type, notify_type, 0) != + notify_type) { + continue; + } + + DBUG_ON(notify_type != XPC_N_CALL); + + atomic_dec(&ch->n_to_notify); + + if (notify->func != NULL) { + dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, " + "msg_number=%ld, partid=%d, channel=%d\n", + (void *) notify, get, ch->partid, ch->number); + + notify->func(reason, ch->partid, ch->number, + notify->key); + + dev_dbg(xpc_chan, "notify->func() returned, " + "notify=0x%p, msg_number=%ld, partid=%d, " + "channel=%d\n", (void *) notify, get, + ch->partid, ch->number); + } + } +} + + +/* * Free up message queues and other stuff that were allocated for the specified * channel. * @@ -669,9 +718,6 @@ xpc_free_msgqueues(struct xpc_channel *ch) ch->remote_msgqueue = NULL; kfree(ch->notify_queue); ch->notify_queue = NULL; - - /* in case someone is waiting for the teardown to complete */ - up(&ch->teardown_sema); } } @@ -683,7 +729,7 @@ static void xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags) { struct xpc_partition *part = &xpc_partitions[ch->partid]; - u32 ch_flags = ch->flags; + u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED); DBUG_ON(!spin_is_locked(&ch->lock)); @@ -701,12 +747,13 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags) } DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0); - /* it's now safe to free the channel's message queues */ - - xpc_free_msgqueues(ch); - DBUG_ON(ch->flags & XPC_C_SETUP); + if (part->act_state == XPC_P_DEACTIVATING) { + /* can't proceed until the other side disengages from us */ + if (xpc_partition_engaged(1UL << ch->partid)) { + return; + } - if (part->act_state != XPC_P_DEACTIVATING) { + } else { /* as long as the other side is up do the full protocol */ @@ -724,16 +771,42 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags) } } + /* wake those waiting for notify completion */ + if (atomic_read(&ch->n_to_notify) > 0) { + /* >>> we do callout while holding ch->lock */ + xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put); + } + /* both sides are disconnected now */ - ch->flags = XPC_C_DISCONNECTED; /* clear all flags, but this one */ + /* it's now safe to free the channel's message queues */ + xpc_free_msgqueues(ch); + + /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */ + ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT)); atomic_dec(&part->nchannels_active); - if (ch_flags & XPC_C_WASCONNECTED) { + if (channel_was_connected) { dev_info(xpc_chan, "channel %d to partition %d disconnected, " "reason=%d\n", ch->number, ch->partid, ch->reason); } + + if (ch->flags & XPC_C_WDISCONNECT) { + spin_unlock_irqrestore(&ch->lock, *irq_flags); + up(&ch->wdisconnect_sema); + spin_lock_irqsave(&ch->lock, *irq_flags); + + } else if (ch->delayed_IPI_flags) { + if (part->act_state != XPC_P_DEACTIVATING) { + /* time to take action on any delayed IPI flags */ + spin_lock(&part->IPI_lock); + XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number, + ch->delayed_IPI_flags); + spin_unlock(&part->IPI_lock); + } + ch->delayed_IPI_flags = 0; + } } @@ -754,6 +827,19 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, spin_lock_irqsave(&ch->lock, irq_flags); +again: + + if ((ch->flags & XPC_C_DISCONNECTED) && + (ch->flags & XPC_C_WDISCONNECT)) { + /* + * Delay processing IPI flags until thread waiting disconnect + * has had a chance to see that the channel is disconnected. + */ + ch->delayed_IPI_flags |= IPI_flags; + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; + } + if (IPI_flags & XPC_IPI_CLOSEREQUEST) { @@ -764,7 +850,7 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, /* * If RCLOSEREQUEST is set, we're probably waiting for * RCLOSEREPLY. We should find it and a ROPENREQUEST packed - * with this RCLOSEQREUQEST in the IPI_flags. + * with this RCLOSEREQUEST in the IPI_flags. */ if (ch->flags & XPC_C_RCLOSEREQUEST) { @@ -779,14 +865,22 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, /* both sides have finished disconnecting */ xpc_process_disconnect(ch, &irq_flags); + DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED)); + goto again; } if (ch->flags & XPC_C_DISCONNECTED) { - // >>> explain this section - if (!(IPI_flags & XPC_IPI_OPENREQUEST)) { - DBUG_ON(part->act_state != - XPC_P_DEACTIVATING); + if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, + ch_number) & XPC_IPI_OPENREQUEST)) { + + DBUG_ON(ch->delayed_IPI_flags != 0); + spin_lock(&part->IPI_lock); + XPC_SET_IPI_FLAGS(part->local_IPI_amo, + ch_number, + XPC_IPI_CLOSEREQUEST); + spin_unlock(&part->IPI_lock); + } spin_unlock_irqrestore(&ch->lock, irq_flags); return; } @@ -816,9 +910,13 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, } XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags); - } else { - xpc_process_disconnect(ch, &irq_flags); + + DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY); + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; } + + xpc_process_disconnect(ch, &irq_flags); } @@ -834,7 +932,20 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, } DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST)); - DBUG_ON(!(ch->flags & XPC_C_RCLOSEREQUEST)); + + if (!(ch->flags & XPC_C_RCLOSEREQUEST)) { + if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number) + & XPC_IPI_CLOSEREQUEST)) { + + DBUG_ON(ch->delayed_IPI_flags != 0); + spin_lock(&part->IPI_lock); + XPC_SET_IPI_FLAGS(part->local_IPI_amo, + ch_number, XPC_IPI_CLOSEREPLY); + spin_unlock(&part->IPI_lock); + } + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; + } ch->flags |= XPC_C_RCLOSEREPLY; @@ -852,8 +963,14 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, "channel=%d\n", args->msg_size, args->local_nentries, ch->partid, ch->number); - if ((ch->flags & XPC_C_DISCONNECTING) || - part->act_state == XPC_P_DEACTIVATING) { + if (part->act_state == XPC_P_DEACTIVATING || + (ch->flags & XPC_C_ROPENREQUEST)) { + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; + } + + if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) { + ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST; spin_unlock_irqrestore(&ch->lock, irq_flags); return; } @@ -867,8 +984,11 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, * msg_size = size of channel's messages in bytes * local_nentries = remote partition's local_nentries */ - DBUG_ON(args->msg_size == 0); - DBUG_ON(args->local_nentries == 0); + if (args->msg_size == 0 || args->local_nentries == 0) { + /* assume OPENREQUEST was delayed by mistake */ + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; + } ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING); ch->remote_nentries = args->local_nentries; @@ -906,7 +1026,13 @@ xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number, spin_unlock_irqrestore(&ch->lock, irq_flags); return; } - DBUG_ON(!(ch->flags & XPC_C_OPENREQUEST)); + if (!(ch->flags & XPC_C_OPENREQUEST)) { + XPC_DISCONNECT_CHANNEL(ch, xpcOpenCloseError, + &irq_flags); + spin_unlock_irqrestore(&ch->lock, irq_flags); + return; + } + DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST)); DBUG_ON(ch->flags & XPC_C_CONNECTED); @@ -960,8 +1086,8 @@ xpc_connect_channel(struct xpc_channel *ch) struct xpc_registration *registration = &xpc_registrations[ch->number]; - if (down_interruptible(®istration->sema) != 0) { - return xpcInterrupted; + if (down_trylock(®istration->sema) != 0) { + return xpcRetry; } if (!XPC_CHANNEL_REGISTERED(ch->number)) { @@ -1040,55 +1166,6 @@ xpc_connect_channel(struct xpc_channel *ch) /* - * Notify those who wanted to be notified upon delivery of their message. - */ -static void -xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put) -{ - struct xpc_notify *notify; - u8 notify_type; - s64 get = ch->w_remote_GP.get - 1; - - - while (++get < put && atomic_read(&ch->n_to_notify) > 0) { - - notify = &ch->notify_queue[get % ch->local_nentries]; - - /* - * See if the notify entry indicates it was associated with - * a message who's sender wants to be notified. It is possible - * that it is, but someone else is doing or has done the - * notification. - */ - notify_type = notify->type; - if (notify_type == 0 || - cmpxchg(¬ify->type, notify_type, 0) != - notify_type) { - continue; - } - - DBUG_ON(notify_type != XPC_N_CALL); - - atomic_dec(&ch->n_to_notify); - - if (notify->func != NULL) { - dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, " - "msg_number=%ld, partid=%d, channel=%d\n", - (void *) notify, get, ch->partid, ch->number); - - notify->func(reason, ch->partid, ch->number, - notify->key); - - dev_dbg(xpc_chan, "notify->func() returned, " - "notify=0x%p, msg_number=%ld, partid=%d, " - "channel=%d\n", (void *) notify, get, - ch->partid, ch->number); - } - } -} - - -/* * Clear some of the msg flags in the local message queue. */ static inline void @@ -1240,6 +1317,7 @@ xpc_process_channel_activity(struct xpc_partition *part) u64 IPI_amo, IPI_flags; struct xpc_channel *ch; int ch_number; + u32 ch_flags; IPI_amo = xpc_get_IPI_flags(part); @@ -1266,8 +1344,9 @@ xpc_process_channel_activity(struct xpc_partition *part) xpc_process_openclose_IPI(part, ch_number, IPI_flags); } + ch_flags = ch->flags; /* need an atomic snapshot of flags */ - if (ch->flags & XPC_C_DISCONNECTING) { + if (ch_flags & XPC_C_DISCONNECTING) { spin_lock_irqsave(&ch->lock, irq_flags); xpc_process_disconnect(ch, &irq_flags); spin_unlock_irqrestore(&ch->lock, irq_flags); @@ -1278,9 +1357,9 @@ xpc_process_channel_activity(struct xpc_partition *part) continue; } - if (!(ch->flags & XPC_C_CONNECTED)) { - if (!(ch->flags & XPC_C_OPENREQUEST)) { - DBUG_ON(ch->flags & XPC_C_SETUP); + if (!(ch_flags & XPC_C_CONNECTED)) { + if (!(ch_flags & XPC_C_OPENREQUEST)) { + DBUG_ON(ch_flags & XPC_C_SETUP); (void) xpc_connect_channel(ch); } else { spin_lock_irqsave(&ch->lock, irq_flags); @@ -1305,8 +1384,8 @@ xpc_process_channel_activity(struct xpc_partition *part) /* - * XPC's heartbeat code calls this function to inform XPC that a partition has - * gone down. XPC responds by tearing down the XPartition Communication + * XPC's heartbeat code calls this function to inform XPC that a partition is + * going down. XPC responds by tearing down the XPartition Communication * infrastructure used for the just downed partition. * * XPC's heartbeat code will never call this function and xpc_partition_up() @@ -1314,7 +1393,7 @@ xpc_process_channel_activity(struct xpc_partition *part) * at the same time. */ void -xpc_partition_down(struct xpc_partition *part, enum xpc_retval reason) +xpc_partition_going_down(struct xpc_partition *part, enum xpc_retval reason) { unsigned long irq_flags; int ch_number; @@ -1330,12 +1409,11 @@ xpc_partition_down(struct xpc_partition *part, enum xpc_retval reason) } - /* disconnect all channels associated with the downed partition */ + /* disconnect channels associated with the partition going down */ for (ch_number = 0; ch_number < part->nchannels; ch_number++) { ch = &part->channels[ch_number]; - xpc_msgqueue_ref(ch); spin_lock_irqsave(&ch->lock, irq_flags); @@ -1370,6 +1448,7 @@ xpc_teardown_infrastructure(struct xpc_partition *part) * this partition. */ + DBUG_ON(atomic_read(&part->nchannels_engaged) != 0); DBUG_ON(atomic_read(&part->nchannels_active) != 0); DBUG_ON(part->setup_state != XPC_P_SETUP); part->setup_state = XPC_P_WTEARDOWN; @@ -1428,19 +1507,11 @@ xpc_initiate_connect(int ch_number) if (xpc_part_ref(part)) { ch = &part->channels[ch_number]; - if (!(ch->flags & XPC_C_DISCONNECTING)) { - DBUG_ON(ch->flags & XPC_C_OPENREQUEST); - DBUG_ON(ch->flags & XPC_C_CONNECTED); - DBUG_ON(ch->flags & XPC_C_SETUP); - - /* - * Initiate the establishment of a connection - * on the newly registered channel to the - * remote partition. - */ - xpc_wakeup_channel_mgr(part); - } - + /* + * Initiate the establishment of a connection on the + * newly registered channel to the remote partition. + */ + xpc_wakeup_channel_mgr(part); xpc_part_deref(part); } } @@ -1450,9 +1521,6 @@ xpc_initiate_connect(int ch_number) void xpc_connected_callout(struct xpc_channel *ch) { - unsigned long irq_flags; - - /* let the registerer know that a connection has been established */ if (ch->func != NULL) { @@ -1465,10 +1533,6 @@ xpc_connected_callout(struct xpc_channel *ch) dev_dbg(xpc_chan, "ch->func() returned, reason=xpcConnected, " "partid=%d, channel=%d\n", ch->partid, ch->number); } - - spin_lock_irqsave(&ch->lock, irq_flags); - ch->flags |= XPC_C_CONNECTCALLOUT; - spin_unlock_irqrestore(&ch->lock, irq_flags); } @@ -1506,8 +1570,12 @@ xpc_initiate_disconnect(int ch_number) spin_lock_irqsave(&ch->lock, irq_flags); - XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering, + if (!(ch->flags & XPC_C_DISCONNECTED)) { + ch->flags |= XPC_C_WDISCONNECT; + + XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering, &irq_flags); + } spin_unlock_irqrestore(&ch->lock, irq_flags); @@ -1523,8 +1591,9 @@ xpc_initiate_disconnect(int ch_number) /* * To disconnect a channel, and reflect it back to all who may be waiting. * - * >>> An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by - * >>> xpc_free_msgqueues(). + * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by + * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by + * xpc_disconnect_wait(). * * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN. */ @@ -1532,7 +1601,7 @@ void xpc_disconnect_channel(const int line, struct xpc_channel *ch, enum xpc_retval reason, unsigned long *irq_flags) { - u32 flags; + u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED); DBUG_ON(!spin_is_locked(&ch->lock)); @@ -1547,61 +1616,53 @@ xpc_disconnect_channel(const int line, struct xpc_channel *ch, XPC_SET_REASON(ch, reason, line); - flags = ch->flags; + ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING); /* some of these may not have been set */ ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY | XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY | XPC_C_CONNECTING | XPC_C_CONNECTED); - ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING); xpc_IPI_send_closerequest(ch, irq_flags); - if (flags & XPC_C_CONNECTED) { + if (channel_was_connected) { ch->flags |= XPC_C_WASCONNECTED; } + spin_unlock_irqrestore(&ch->lock, *irq_flags); + + /* wake all idle kthreads so they can exit */ if (atomic_read(&ch->kthreads_idle) > 0) { - /* wake all idle kthreads so they can exit */ wake_up_all(&ch->idle_wq); } - spin_unlock_irqrestore(&ch->lock, *irq_flags); - - /* wake those waiting to allocate an entry from the local msg queue */ - if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) { wake_up(&ch->msg_allocate_wq); } - /* wake those waiting for notify completion */ - - if (atomic_read(&ch->n_to_notify) > 0) { - xpc_notify_senders(ch, reason, ch->w_local_GP.put); - } - spin_lock_irqsave(&ch->lock, *irq_flags); } void -xpc_disconnected_callout(struct xpc_channel *ch) +xpc_disconnecting_callout(struct xpc_channel *ch) { /* - * Let the channel's registerer know that the channel is now + * Let the channel's registerer know that the channel is being * disconnected. We don't want to do this if the registerer was never - * informed of a connection being made, unless the disconnect was for - * abnormal reasons. + * informed of a connection being made. */ if (ch->func != NULL) { - dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, " - "channel=%d\n", ch->reason, ch->partid, ch->number); + dev_dbg(xpc_chan, "ch->func() called, reason=xpcDisconnecting," + " partid=%d, channel=%d\n", ch->partid, ch->number); - ch->func(ch->reason, ch->partid, ch->number, NULL, ch->key); + ch->func(xpcDisconnecting, ch->partid, ch->number, NULL, + ch->key); - dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, " - "channel=%d\n", ch->reason, ch->partid, ch->number); + dev_dbg(xpc_chan, "ch->func() returned, reason=" + "xpcDisconnecting, partid=%d, channel=%d\n", + ch->partid, ch->number); } } @@ -1848,7 +1909,7 @@ xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type, xpc_notify_func func, void *key) { enum xpc_retval ret = xpcSuccess; - struct xpc_notify *notify = NULL; // >>> to keep the compiler happy!! + struct xpc_notify *notify = notify; s64 put, msg_number = msg->number; diff --git a/arch/ia64/sn/kernel/xpc_main.c b/arch/ia64/sn/kernel/xpc_main.c index ed7c215..cece3c7 100644 --- a/arch/ia64/sn/kernel/xpc_main.c +++ b/arch/ia64/sn/kernel/xpc_main.c @@ -54,6 +54,7 @@ #include <linux/interrupt.h> #include <linux/slab.h> #include <linux/delay.h> +#include <linux/reboot.h> #include <asm/sn/intr.h> #include <asm/sn/sn_sal.h> #include <asm/uaccess.h> @@ -82,11 +83,17 @@ struct device *xpc_chan = &xpc_chan_dbg_subname; /* systune related variables for /proc/sys directories */ -static int xpc_hb_min = 1; -static int xpc_hb_max = 10; +static int xpc_hb_interval = XPC_HB_DEFAULT_INTERVAL; +static int xpc_hb_min_interval = 1; +static int xpc_hb_max_interval = 10; -static int xpc_hb_check_min = 10; -static int xpc_hb_check_max = 120; +static int xpc_hb_check_interval = XPC_HB_CHECK_DEFAULT_INTERVAL; +static int xpc_hb_check_min_interval = 10; +static int xpc_hb_check_max_interval = 120; + +int xpc_disengage_request_timelimit = XPC_DISENGAGE_REQUEST_DEFAULT_TIMELIMIT; +static int xpc_disengage_request_min_timelimit = 0; +static int xpc_disengage_request_max_timelimit = 120; static ctl_table xpc_sys_xpc_hb_dir[] = { { @@ -99,7 +106,8 @@ static ctl_table xpc_sys_xpc_hb_dir[] = { &proc_dointvec_minmax, &sysctl_intvec, NULL, - &xpc_hb_min, &xpc_hb_max + &xpc_hb_min_interval, + &xpc_hb_max_interval }, { 2, @@ -111,7 +119,8 @@ static ctl_table xpc_sys_xpc_hb_dir[] = { &proc_dointvec_minmax, &sysctl_intvec, NULL, - &xpc_hb_check_min, &xpc_hb_check_max + &xpc_hb_check_min_interval, + &xpc_hb_check_max_interval }, {0} }; @@ -124,6 +133,19 @@ static ctl_table xpc_sys_xpc_dir[] = { 0555, xpc_sys_xpc_hb_dir }, + { + 2, + "disengage_request_timelimit", + &xpc_disengage_request_timelimit, + sizeof(int), + 0644, + NULL, + &proc_dointvec_minmax, + &sysctl_intvec, + NULL, + &xpc_disengage_request_min_timelimit, + &xpc_disengage_request_max_timelimit + }, {0} }; static ctl_table xpc_sys_dir[] = { @@ -148,10 +170,10 @@ static DECLARE_WAIT_QUEUE_HEAD(xpc_act_IRQ_wq); static unsigned long xpc_hb_check_timeout; -/* xpc_hb_checker thread exited notification */ +/* notification that the xpc_hb_checker thread has exited */ static DECLARE_MUTEX_LOCKED(xpc_hb_checker_exited); -/* xpc_discovery thread exited notification */ +/* notification that the xpc_discovery thread has exited */ static DECLARE_MUTEX_LOCKED(xpc_discovery_exited); @@ -161,6 +183,30 @@ static struct timer_list xpc_hb_timer; static void xpc_kthread_waitmsgs(struct xpc_partition *, struct xpc_channel *); +static int xpc_system_reboot(struct notifier_block *, unsigned long, void *); +static struct notifier_block xpc_reboot_notifier = { + .notifier_call = xpc_system_reboot, +}; + + +/* + * Timer function to enforce the timelimit on the partition disengage request. + */ +static void +xpc_timeout_partition_disengage_request(unsigned long data) +{ + struct xpc_partition *part = (struct xpc_partition *) data; + + + DBUG_ON(jiffies < part->disengage_request_timeout); + + (void) xpc_partition_disengaged(part); + + DBUG_ON(part->disengage_request_timeout != 0); + DBUG_ON(xpc_partition_engaged(1UL << XPC_PARTID(part)) != 0); +} + + /* * Notify the heartbeat check thread that an IRQ has been received. */ @@ -214,12 +260,6 @@ xpc_hb_checker(void *ignore) while (!(volatile int) xpc_exiting) { - /* wait for IRQ or timeout */ - (void) wait_event_interruptible(xpc_act_IRQ_wq, - (last_IRQ_count < atomic_read(&xpc_act_IRQ_rcvd) || - jiffies >= xpc_hb_check_timeout || - (volatile int) xpc_exiting)); - dev_dbg(xpc_part, "woke up with %d ticks rem; %d IRQs have " "been received\n", (int) (xpc_hb_check_timeout - jiffies), @@ -240,6 +280,7 @@ xpc_hb_checker(void *ignore) } + /* check for outstanding IRQs */ new_IRQ_count = atomic_read(&xpc_act_IRQ_rcvd); if (last_IRQ_count < new_IRQ_count || force_IRQ != 0) { force_IRQ = 0; @@ -257,12 +298,18 @@ xpc_hb_checker(void *ignore) xpc_hb_check_timeout = jiffies + (xpc_hb_check_interval * HZ); } + + /* wait for IRQ or timeout */ + (void) wait_event_interruptible(xpc_act_IRQ_wq, + (last_IRQ_count < atomic_read(&xpc_act_IRQ_rcvd) || + jiffies >= xpc_hb_check_timeout || + (volatile int) xpc_exiting)); } dev_dbg(xpc_part, "heartbeat checker is exiting\n"); - /* mark this thread as inactive */ + /* mark this thread as having exited */ up(&xpc_hb_checker_exited); return 0; } @@ -282,7 +329,7 @@ xpc_initiate_discovery(void *ignore) dev_dbg(xpc_part, "discovery thread is exiting\n"); - /* mark this thread as inactive */ + /* mark this thread as having exited */ up(&xpc_discovery_exited); return 0; } @@ -309,7 +356,7 @@ xpc_make_first_contact(struct xpc_partition *part) "partition %d\n", XPC_PARTID(part)); /* wait a 1/4 of a second or so */ - msleep_interruptible(250); + (void) msleep_interruptible(250); if (part->act_state == XPC_P_DEACTIVATING) { return part->reason; @@ -336,7 +383,8 @@ static void xpc_channel_mgr(struct xpc_partition *part) { while (part->act_state != XPC_P_DEACTIVATING || - atomic_read(&part->nchannels_active) > 0) { + atomic_read(&part->nchannels_active) > 0 || + !xpc_partition_disengaged(part)) { xpc_process_channel_activity(part); @@ -360,7 +408,8 @@ xpc_channel_mgr(struct xpc_partition *part) (volatile u64) part->local_IPI_amo != 0 || ((volatile u8) part->act_state == XPC_P_DEACTIVATING && - atomic_read(&part->nchannels_active) == 0))); + atomic_read(&part->nchannels_active) == 0 && + xpc_partition_disengaged(part)))); atomic_set(&part->channel_mgr_requests, 1); // >>> Does it need to wakeup periodically as well? In case we @@ -482,7 +531,7 @@ xpc_activating(void *__partid) return 0; } - XPC_ALLOW_HB(partid, xpc_vars); + xpc_allow_hb(partid, xpc_vars); xpc_IPI_send_activated(part); @@ -492,6 +541,7 @@ xpc_activating(void *__partid) */ (void) xpc_partition_up(part); + xpc_disallow_hb(partid, xpc_vars); xpc_mark_partition_inactive(part); if (part->reason == xpcReactivating) { @@ -670,6 +720,7 @@ xpc_daemonize_kthread(void *args) struct xpc_partition *part = &xpc_partitions[partid]; struct xpc_channel *ch; int n_needed; + unsigned long irq_flags; daemonize("xpc%02dc%d", partid, ch_number); @@ -680,11 +731,14 @@ xpc_daemonize_kthread(void *args) ch = &part->channels[ch_number]; if (!(ch->flags & XPC_C_DISCONNECTING)) { - DBUG_ON(!(ch->flags & XPC_C_CONNECTED)); /* let registerer know that connection has been established */ - if (atomic_read(&ch->kthreads_assigned) == 1) { + spin_lock_irqsave(&ch->lock, irq_flags); + if (!(ch->flags & XPC_C_CONNECTCALLOUT)) { + ch->flags |= XPC_C_CONNECTCALLOUT; + spin_unlock_irqrestore(&ch->lock, irq_flags); + xpc_connected_callout(ch); /* @@ -699,16 +753,28 @@ xpc_daemonize_kthread(void *args) !(ch->flags & XPC_C_DISCONNECTING)) { xpc_activate_kthreads(ch, n_needed); } + } else { + spin_unlock_irqrestore(&ch->lock, irq_flags); } xpc_kthread_waitmsgs(part, ch); } - if (atomic_dec_return(&ch->kthreads_assigned) == 0 && - ((ch->flags & XPC_C_CONNECTCALLOUT) || - (ch->reason != xpcUnregistering && - ch->reason != xpcOtherUnregistering))) { - xpc_disconnected_callout(ch); + if (atomic_dec_return(&ch->kthreads_assigned) == 0) { + spin_lock_irqsave(&ch->lock, irq_flags); + if ((ch->flags & XPC_C_CONNECTCALLOUT) && + !(ch->flags & XPC_C_DISCONNECTCALLOUT)) { + ch->flags |= XPC_C_DISCONNECTCALLOUT; + spin_unlock_irqrestore(&ch->lock, irq_flags); + + xpc_disconnecting_callout(ch); + } else { + spin_unlock_irqrestore(&ch->lock, irq_flags); + } + if (atomic_dec_return(&part->nchannels_engaged) == 0) { + xpc_mark_partition_disengaged(part); + xpc_IPI_send_disengage(part); + } } @@ -740,12 +806,33 @@ xpc_create_kthreads(struct xpc_channel *ch, int needed) unsigned long irq_flags; pid_t pid; u64 args = XPC_PACK_ARGS(ch->partid, ch->number); + struct xpc_partition *part = &xpc_partitions[ch->partid]; while (needed-- > 0) { + + /* + * The following is done on behalf of the newly created + * kthread. That kthread is responsible for doing the + * counterpart to the following before it exits. + */ + (void) xpc_part_ref(part); + xpc_msgqueue_ref(ch); + if (atomic_inc_return(&ch->kthreads_assigned) == 1 && + atomic_inc_return(&part->nchannels_engaged) == 1) { + xpc_mark_partition_engaged(part); + } + pid = kernel_thread(xpc_daemonize_kthread, (void *) args, 0); if (pid < 0) { /* the fork failed */ + if (atomic_dec_return(&ch->kthreads_assigned) == 0 && + atomic_dec_return(&part->nchannels_engaged) == 0) { + xpc_mark_partition_disengaged(part); + xpc_IPI_send_disengage(part); + } + xpc_msgqueue_deref(ch); + xpc_part_deref(part); if (atomic_read(&ch->kthreads_assigned) < ch->kthreads_idle_limit) { @@ -765,14 +852,6 @@ xpc_create_kthreads(struct xpc_channel *ch, int needed) break; } - /* - * The following is done on behalf of the newly created - * kthread. That kthread is responsible for doing the - * counterpart to the following before it exits. - */ - (void) xpc_part_ref(&xpc_partitions[ch->partid]); - xpc_msgqueue_ref(ch); - atomic_inc(&ch->kthreads_assigned); ch->kthreads_created++; // >>> temporary debug only!!! } } @@ -781,87 +860,142 @@ xpc_create_kthreads(struct xpc_channel *ch, int needed) void xpc_disconnect_wait(int ch_number) { + unsigned long irq_flags; partid_t partid; struct xpc_partition *part; struct xpc_channel *ch; + int wakeup_channel_mgr; /* now wait for all callouts to the caller's function to cease */ for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) { part = &xpc_partitions[partid]; - if (xpc_part_ref(part)) { - ch = &part->channels[ch_number]; + if (!xpc_part_ref(part)) { + continue; + } -// >>> how do we keep from falling into the window between our check and going -// >>> down and coming back up where sema is re-inited? - if (ch->flags & XPC_C_SETUP) { - (void) down(&ch->teardown_sema); - } + ch = &part->channels[ch_number]; + if (!(ch->flags & XPC_C_WDISCONNECT)) { xpc_part_deref(part); + continue; + } + + (void) down(&ch->wdisconnect_sema); + + spin_lock_irqsave(&ch->lock, irq_flags); + DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED)); + wakeup_channel_mgr = 0; + + if (ch->delayed_IPI_flags) { + if (part->act_state != XPC_P_DEACTIVATING) { + spin_lock(&part->IPI_lock); + XPC_SET_IPI_FLAGS(part->local_IPI_amo, + ch->number, ch->delayed_IPI_flags); + spin_unlock(&part->IPI_lock); + wakeup_channel_mgr = 1; + } + ch->delayed_IPI_flags = 0; } + + ch->flags &= ~XPC_C_WDISCONNECT; + spin_unlock_irqrestore(&ch->lock, irq_flags); + + if (wakeup_channel_mgr) { + xpc_wakeup_channel_mgr(part); + } + + xpc_part_deref(part); } } static void -xpc_do_exit(void) +xpc_do_exit(enum xpc_retval reason) { partid_t partid; int active_part_count; struct xpc_partition *part; + unsigned long printmsg_time; - /* now it's time to eliminate our heartbeat */ - del_timer_sync(&xpc_hb_timer); - xpc_vars->heartbeating_to_mask = 0; - - /* indicate to others that our reserved page is uninitialized */ - xpc_rsvd_page->vars_pa = 0; - - /* - * Ignore all incoming interrupts. Without interupts the heartbeat - * checker won't activate any new partitions that may come up. - */ - free_irq(SGI_XPC_ACTIVATE, NULL); + /* a 'rmmod XPC' and a 'reboot' cannot both end up here together */ + DBUG_ON(xpc_exiting == 1); /* - * Cause the heartbeat checker and the discovery threads to exit. - * We don't want them attempting to activate new partitions as we - * try to deactivate the existing ones. + * Let the heartbeat checker thread and the discovery thread + * (if one is running) know that they should exit. Also wake up + * the heartbeat checker thread in case it's sleeping. */ xpc_exiting = 1; wake_up_interruptible(&xpc_act_IRQ_wq); - /* wait for the heartbeat checker thread to mark itself inactive */ - down(&xpc_hb_checker_exited); + /* ignore all incoming interrupts */ + free_irq(SGI_XPC_ACTIVATE, NULL); - /* wait for the discovery thread to mark itself inactive */ + /* wait for the discovery thread to exit */ down(&xpc_discovery_exited); + /* wait for the heartbeat checker thread to exit */ + down(&xpc_hb_checker_exited); - msleep_interruptible(300); + + /* sleep for a 1/3 of a second or so */ + (void) msleep_interruptible(300); /* wait for all partitions to become inactive */ + printmsg_time = jiffies; + do { active_part_count = 0; for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) { part = &xpc_partitions[partid]; - if (part->act_state != XPC_P_INACTIVE) { - active_part_count++; - XPC_DEACTIVATE_PARTITION(part, xpcUnloading); + if (xpc_partition_disengaged(part) && + part->act_state == XPC_P_INACTIVE) { + continue; } + + active_part_count++; + + XPC_DEACTIVATE_PARTITION(part, reason); } - if (active_part_count) - msleep_interruptible(300); - } while (active_part_count > 0); + if (active_part_count == 0) { + break; + } + if (jiffies >= printmsg_time) { + dev_info(xpc_part, "waiting for partitions to " + "deactivate/disengage, active count=%d, remote " + "engaged=0x%lx\n", active_part_count, + xpc_partition_engaged(1UL << partid)); + + printmsg_time = jiffies + + (XPC_DISENGAGE_PRINTMSG_INTERVAL * HZ); + } + + /* sleep for a 1/3 of a second or so */ + (void) msleep_interruptible(300); + + } while (1); + + DBUG_ON(xpc_partition_engaged(-1UL)); + + + /* indicate to others that our reserved page is uninitialized */ + xpc_rsvd_page->vars_pa = 0; + + /* now it's time to eliminate our heartbeat */ + del_timer_sync(&xpc_hb_timer); + DBUG_ON(xpc_vars->heartbeating_to_mask != 0); + + /* take ourselves off of the reboot_notifier_list */ + (void) unregister_reboot_notifier(&xpc_reboot_notifier); /* close down protections for IPI operations */ xpc_restrict_IPI_ops(); @@ -876,6 +1010,34 @@ xpc_do_exit(void) } +/* + * This function is called when the system is being rebooted. + */ +static int +xpc_system_reboot(struct notifier_block *nb, unsigned long event, void *unused) +{ + enum xpc_retval reason; + + + switch (event) { + case SYS_RESTART: + reason = xpcSystemReboot; + break; + case SYS_HALT: + reason = xpcSystemHalt; + break; + case SYS_POWER_OFF: + reason = xpcSystemPoweroff; + break; + default: + reason = xpcSystemGoingDown; + } + + xpc_do_exit(reason); + return NOTIFY_DONE; +} + + int __init xpc_init(void) { @@ -891,11 +1053,11 @@ xpc_init(void) /* * xpc_remote_copy_buffer is used as a temporary buffer for bte_copy'ng - * both a partition's reserved page and its XPC variables. Its size was - * based on the size of a reserved page. So we need to ensure that the - * XPC variables will fit as well. + * various portions of a partition's reserved page. Its size is based + * on the size of the reserved page header and part_nasids mask. So we + * need to ensure that the other items will fit as well. */ - if (XPC_VARS_ALIGNED_SIZE > XPC_RSVD_PAGE_ALIGNED_SIZE) { + if (XPC_RP_VARS_SIZE > XPC_RP_HEADER_SIZE + XP_NASID_MASK_BYTES) { dev_err(xpc_part, "xpc_remote_copy_buffer is not big enough\n"); return -EPERM; } @@ -924,6 +1086,12 @@ xpc_init(void) spin_lock_init(&part->act_lock); part->act_state = XPC_P_INACTIVE; XPC_SET_REASON(part, 0, 0); + + init_timer(&part->disengage_request_timer); + part->disengage_request_timer.function = + xpc_timeout_partition_disengage_request; + part->disengage_request_timer.data = (unsigned long) part; + part->setup_state = XPC_P_UNSET; init_waitqueue_head(&part->teardown_wq); atomic_set(&part->references, 0); @@ -980,6 +1148,13 @@ xpc_init(void) } + /* add ourselves to the reboot_notifier_list */ + ret = register_reboot_notifier(&xpc_reboot_notifier); + if (ret != 0) { + dev_warn(xpc_part, "can't register reboot notifier\n"); + } + + /* * Set the beating to other partitions into motion. This is * the last requirement for other partitions' discovery to @@ -1001,6 +1176,9 @@ xpc_init(void) /* indicate to others that our reserved page is uninitialized */ xpc_rsvd_page->vars_pa = 0; + /* take ourselves off of the reboot_notifier_list */ + (void) unregister_reboot_notifier(&xpc_reboot_notifier); + del_timer_sync(&xpc_hb_timer); free_irq(SGI_XPC_ACTIVATE, NULL); xpc_restrict_IPI_ops(); @@ -1024,7 +1202,7 @@ xpc_init(void) /* mark this new thread as a non-starter */ up(&xpc_discovery_exited); - xpc_do_exit(); + xpc_do_exit(xpcUnloading); return -EBUSY; } @@ -1043,7 +1221,7 @@ module_init(xpc_init); void __exit xpc_exit(void) { - xpc_do_exit(); + xpc_do_exit(xpcUnloading); } module_exit(xpc_exit); @@ -1060,3 +1238,7 @@ module_param(xpc_hb_check_interval, int, 0); MODULE_PARM_DESC(xpc_hb_check_interval, "Number of seconds between " "heartbeat checks."); +module_param(xpc_disengage_request_timelimit, int, 0); +MODULE_PARM_DESC(xpc_disengage_request_timelimit, "Number of seconds to wait " + "for disengage request to complete."); + diff --git a/arch/ia64/sn/kernel/xpc_partition.c b/arch/ia64/sn/kernel/xpc_partition.c index 72ef330..581e113 100644 --- a/arch/ia64/sn/kernel/xpc_partition.c +++ b/arch/ia64/sn/kernel/xpc_partition.c @@ -47,13 +47,16 @@ static u64 xpc_sh2_IPI_access3; u64 xpc_prot_vec[MAX_NUMNODES]; -/* this partition's reserved page */ +/* this partition's reserved page pointers */ struct xpc_rsvd_page *xpc_rsvd_page; - -/* this partition's XPC variables (within the reserved page) */ +static u64 *xpc_part_nasids; +static u64 *xpc_mach_nasids; struct xpc_vars *xpc_vars; struct xpc_vars_part *xpc_vars_part; +static int xp_nasid_mask_bytes; /* actual size in bytes of nasid mask */ +static int xp_nasid_mask_words; /* actual size in words of nasid mask */ + /* * For performance reasons, each entry of xpc_partitions[] is cacheline @@ -65,20 +68,16 @@ struct xpc_partition xpc_partitions[XP_MAX_PARTITIONS + 1]; /* - * Generic buffer used to store a local copy of the remote partitions - * reserved page or XPC variables. + * Generic buffer used to store a local copy of portions of a remote + * partition's reserved page (either its header and part_nasids mask, + * or its vars). * * xpc_discovery runs only once and is a seperate thread that is * very likely going to be processing in parallel with receiving * interrupts. */ -char ____cacheline_aligned - xpc_remote_copy_buffer[XPC_RSVD_PAGE_ALIGNED_SIZE]; - - -/* systune related variables */ -int xpc_hb_interval = XPC_HB_DEFAULT_INTERVAL; -int xpc_hb_check_interval = XPC_HB_CHECK_DEFAULT_TIMEOUT; +char ____cacheline_aligned xpc_remote_copy_buffer[XPC_RP_HEADER_SIZE + + XP_NASID_MASK_BYTES]; /* @@ -86,13 +85,16 @@ int xpc_hb_check_interval = XPC_HB_CHECK_DEFAULT_TIMEOUT; * for that nasid. This function returns 0 on any error. */ static u64 -xpc_get_rsvd_page_pa(int nasid, u64 buf, u64 buf_size) +xpc_get_rsvd_page_pa(int nasid) { bte_result_t bte_res; s64 status; u64 cookie = 0; u64 rp_pa = nasid; /* seed with nasid */ u64 len = 0; + u64 buf = buf; + u64 buf_len = 0; + void *buf_base = NULL; while (1) { @@ -108,13 +110,22 @@ xpc_get_rsvd_page_pa(int nasid, u64 buf, u64 buf_size) break; } - if (len > buf_size) { - dev_err(xpc_part, "len (=0x%016lx) > buf_size\n", len); - status = SALRET_ERROR; - break; + if (L1_CACHE_ALIGN(len) > buf_len) { + if (buf_base != NULL) { + kfree(buf_base); + } + buf_len = L1_CACHE_ALIGN(len); + buf = (u64) xpc_kmalloc_cacheline_aligned(buf_len, + GFP_KERNEL, &buf_base); + if (buf_base == NULL) { + dev_err(xpc_part, "unable to kmalloc " + "len=0x%016lx\n", buf_len); + status = SALRET_ERROR; + break; + } } - bte_res = xp_bte_copy(rp_pa, ia64_tpa(buf), buf_size, + bte_res = xp_bte_copy(rp_pa, ia64_tpa(buf), buf_len, (BTE_NOTIFY | BTE_WACQUIRE), NULL); if (bte_res != BTE_SUCCESS) { dev_dbg(xpc_part, "xp_bte_copy failed %i\n", bte_res); @@ -123,6 +134,10 @@ xpc_get_rsvd_page_pa(int nasid, u64 buf, u64 buf_size) } } + if (buf_base != NULL) { + kfree(buf_base); + } + if (status != SALRET_OK) { rp_pa = 0; } @@ -141,15 +156,15 @@ xpc_rsvd_page_init(void) { struct xpc_rsvd_page *rp; AMO_t *amos_page; - u64 rp_pa, next_cl, nasid_array = 0; + u64 rp_pa, nasid_array = 0; int i, ret; /* get the local reserved page's address */ - rp_pa = xpc_get_rsvd_page_pa(cnodeid_to_nasid(0), - (u64) xpc_remote_copy_buffer, - XPC_RSVD_PAGE_ALIGNED_SIZE); + preempt_disable(); + rp_pa = xpc_get_rsvd_page_pa(cpuid_to_nasid(smp_processor_id())); + preempt_enable(); if (rp_pa == 0) { dev_err(xpc_part, "SAL failed to locate the reserved page\n"); return NULL; @@ -164,12 +179,19 @@ xpc_rsvd_page_init(void) rp->version = XPC_RP_VERSION; - /* - * Place the XPC variables on the cache line following the - * reserved page structure. - */ - next_cl = (u64) rp + XPC_RSVD_PAGE_ALIGNED_SIZE; - xpc_vars = (struct xpc_vars *) next_cl; + /* establish the actual sizes of the nasid masks */ + if (rp->SAL_version == 1) { + /* SAL_version 1 didn't set the nasids_size field */ + rp->nasids_size = 128; + } + xp_nasid_mask_bytes = rp->nasids_size; + xp_nasid_mask_words = xp_nasid_mask_bytes / 8; + + /* setup the pointers to the various items in the reserved page */ + xpc_part_nasids = XPC_RP_PART_NASIDS(rp); + xpc_mach_nasids = XPC_RP_MACH_NASIDS(rp); + xpc_vars = XPC_RP_VARS(rp); + xpc_vars_part = XPC_RP_VARS_PART(rp); /* * Before clearing xpc_vars, see if a page of AMOs had been previously @@ -221,33 +243,32 @@ xpc_rsvd_page_init(void) amos_page = (AMO_t *) TO_AMO((u64) amos_page); } + /* clear xpc_vars */ memset(xpc_vars, 0, sizeof(struct xpc_vars)); - /* - * Place the XPC per partition specific variables on the cache line - * following the XPC variables structure. - */ - next_cl += XPC_VARS_ALIGNED_SIZE; - memset((u64 *) next_cl, 0, sizeof(struct xpc_vars_part) * - XP_MAX_PARTITIONS); - xpc_vars_part = (struct xpc_vars_part *) next_cl; - xpc_vars->vars_part_pa = __pa(next_cl); - xpc_vars->version = XPC_V_VERSION; xpc_vars->act_nasid = cpuid_to_nasid(0); xpc_vars->act_phys_cpuid = cpu_physical_id(0); + xpc_vars->vars_part_pa = __pa(xpc_vars_part); + xpc_vars->amos_page_pa = ia64_tpa((u64) amos_page); xpc_vars->amos_page = amos_page; /* save for next load of XPC */ - /* - * Initialize the activation related AMO variables. - */ - xpc_vars->act_amos = xpc_IPI_init(XP_MAX_PARTITIONS); - for (i = 1; i < XP_NASID_MASK_WORDS; i++) { - xpc_IPI_init(i + XP_MAX_PARTITIONS); + /* clear xpc_vars_part */ + memset((u64 *) xpc_vars_part, 0, sizeof(struct xpc_vars_part) * + XP_MAX_PARTITIONS); + + /* initialize the activate IRQ related AMO variables */ + for (i = 0; i < xp_nasid_mask_words; i++) { + (void) xpc_IPI_init(XPC_ACTIVATE_IRQ_AMOS + i); } - /* export AMO page's physical address to other partitions */ - xpc_vars->amos_page_pa = ia64_tpa((u64) xpc_vars->amos_page); + + /* initialize the engaged remote partitions related AMO variables */ + (void) xpc_IPI_init(XPC_ENGAGED_PARTITIONS_AMO); + (void) xpc_IPI_init(XPC_DISENGAGE_REQUEST_AMO); + + /* timestamp of when reserved page was setup by XPC */ + rp->stamp = CURRENT_TIME; /* * This signifies to the remote partition that our reserved @@ -387,6 +408,11 @@ xpc_check_remote_hb(void) remote_vars = (struct xpc_vars *) xpc_remote_copy_buffer; for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) { + + if (xpc_exiting) { + break; + } + if (partid == sn_partition_id) { continue; } @@ -401,7 +427,7 @@ xpc_check_remote_hb(void) /* pull the remote_hb cache line */ bres = xp_bte_copy(part->remote_vars_pa, ia64_tpa((u64) remote_vars), - XPC_VARS_ALIGNED_SIZE, + XPC_RP_VARS_SIZE, (BTE_NOTIFY | BTE_WACQUIRE), NULL); if (bres != BTE_SUCCESS) { XPC_DEACTIVATE_PARTITION(part, @@ -417,7 +443,7 @@ xpc_check_remote_hb(void) if (((remote_vars->heartbeat == part->last_heartbeat) && (remote_vars->kdb_status == 0)) || - !XPC_HB_ALLOWED(sn_partition_id, remote_vars)) { + !xpc_hb_allowed(sn_partition_id, remote_vars)) { XPC_DEACTIVATE_PARTITION(part, xpcNoHeartbeat); continue; @@ -429,31 +455,31 @@ xpc_check_remote_hb(void) /* - * Get a copy of the remote partition's rsvd page. + * Get a copy of a portion of the remote partition's rsvd page. * * remote_rp points to a buffer that is cacheline aligned for BTE copies and - * assumed to be of size XPC_RSVD_PAGE_ALIGNED_SIZE. + * is large enough to contain a copy of their reserved page header and + * part_nasids mask. */ static enum xpc_retval xpc_get_remote_rp(int nasid, u64 *discovered_nasids, - struct xpc_rsvd_page *remote_rp, u64 *remote_rsvd_page_pa) + struct xpc_rsvd_page *remote_rp, u64 *remote_rp_pa) { int bres, i; /* get the reserved page's physical address */ - *remote_rsvd_page_pa = xpc_get_rsvd_page_pa(nasid, (u64) remote_rp, - XPC_RSVD_PAGE_ALIGNED_SIZE); - if (*remote_rsvd_page_pa == 0) { + *remote_rp_pa = xpc_get_rsvd_page_pa(nasid); + if (*remote_rp_pa == 0) { return xpcNoRsvdPageAddr; } - /* pull over the reserved page structure */ + /* pull over the reserved page header and part_nasids mask */ - bres = xp_bte_copy(*remote_rsvd_page_pa, ia64_tpa((u64) remote_rp), - XPC_RSVD_PAGE_ALIGNED_SIZE, + bres = xp_bte_copy(*remote_rp_pa, ia64_tpa((u64) remote_rp), + XPC_RP_HEADER_SIZE + xp_nasid_mask_bytes, (BTE_NOTIFY | BTE_WACQUIRE), NULL); if (bres != BTE_SUCCESS) { return xpc_map_bte_errors(bres); @@ -461,8 +487,11 @@ xpc_get_remote_rp(int nasid, u64 *discovered_nasids, if (discovered_nasids != NULL) { - for (i = 0; i < XP_NASID_MASK_WORDS; i++) { - discovered_nasids[i] |= remote_rp->part_nasids[i]; + u64 *remote_part_nasids = XPC_RP_PART_NASIDS(remote_rp); + + + for (i = 0; i < xp_nasid_mask_words; i++) { + discovered_nasids[i] |= remote_part_nasids[i]; } } @@ -489,10 +518,10 @@ xpc_get_remote_rp(int nasid, u64 *discovered_nasids, /* - * Get a copy of the remote partition's XPC variables. + * Get a copy of the remote partition's XPC variables from the reserved page. * * remote_vars points to a buffer that is cacheline aligned for BTE copies and - * assumed to be of size XPC_VARS_ALIGNED_SIZE. + * assumed to be of size XPC_RP_VARS_SIZE. */ static enum xpc_retval xpc_get_remote_vars(u64 remote_vars_pa, struct xpc_vars *remote_vars) @@ -508,7 +537,7 @@ xpc_get_remote_vars(u64 remote_vars_pa, struct xpc_vars *remote_vars) /* pull over the cross partition variables */ bres = xp_bte_copy(remote_vars_pa, ia64_tpa((u64) remote_vars), - XPC_VARS_ALIGNED_SIZE, + XPC_RP_VARS_SIZE, (BTE_NOTIFY | BTE_WACQUIRE), NULL); if (bres != BTE_SUCCESS) { return xpc_map_bte_errors(bres); @@ -524,7 +553,56 @@ xpc_get_remote_vars(u64 remote_vars_pa, struct xpc_vars *remote_vars) /* - * Prior code has determine the nasid which generated an IPI. Inspect + * Update the remote partition's info. + */ +static void +xpc_update_partition_info(struct xpc_partition *part, u8 remote_rp_version, + struct timespec *remote_rp_stamp, u64 remote_rp_pa, + u64 remote_vars_pa, struct xpc_vars *remote_vars) +{ + part->remote_rp_version = remote_rp_version; + dev_dbg(xpc_part, " remote_rp_version = 0x%016lx\n", + part->remote_rp_version); + + part->remote_rp_stamp = *remote_rp_stamp; + dev_dbg(xpc_part, " remote_rp_stamp (tv_sec = 0x%lx tv_nsec = 0x%lx\n", + part->remote_rp_stamp.tv_sec, part->remote_rp_stamp.tv_nsec); + + part->remote_rp_pa = remote_rp_pa; + dev_dbg(xpc_part, " remote_rp_pa = 0x%016lx\n", part->remote_rp_pa); + + part->remote_vars_pa = remote_vars_pa; + dev_dbg(xpc_part, " remote_vars_pa = 0x%016lx\n", + part->remote_vars_pa); + + part->last_heartbeat = remote_vars->heartbeat; + dev_dbg(xpc_part, " last_heartbeat = 0x%016lx\n", + part->last_heartbeat); + + part->remote_vars_part_pa = remote_vars->vars_part_pa; + dev_dbg(xpc_part, " remote_vars_part_pa = 0x%016lx\n", + part->remote_vars_part_pa); + + part->remote_act_nasid = remote_vars->act_nasid; + dev_dbg(xpc_part, " remote_act_nasid = 0x%x\n", + part->remote_act_nasid); + + part->remote_act_phys_cpuid = remote_vars->act_phys_cpuid; + dev_dbg(xpc_part, " remote_act_phys_cpuid = 0x%x\n", + part->remote_act_phys_cpuid); + + part->remote_amos_page_pa = remote_vars->amos_page_pa; + dev_dbg(xpc_part, " remote_amos_page_pa = 0x%lx\n", + part->remote_amos_page_pa); + + part->remote_vars_version = remote_vars->version; + dev_dbg(xpc_part, " remote_vars_version = 0x%x\n", + part->remote_vars_version); +} + + +/* + * Prior code has determined the nasid which generated an IPI. Inspect * that nasid to determine if its partition needs to be activated or * deactivated. * @@ -542,8 +620,12 @@ xpc_identify_act_IRQ_req(int nasid) { struct xpc_rsvd_page *remote_rp; struct xpc_vars *remote_vars; - u64 remote_rsvd_page_pa; + u64 remote_rp_pa; u64 remote_vars_pa; + int remote_rp_version; + int reactivate = 0; + int stamp_diff; + struct timespec remote_rp_stamp = { 0, 0 }; partid_t partid; struct xpc_partition *part; enum xpc_retval ret; @@ -553,7 +635,7 @@ xpc_identify_act_IRQ_req(int nasid) remote_rp = (struct xpc_rsvd_page *) xpc_remote_copy_buffer; - ret = xpc_get_remote_rp(nasid, NULL, remote_rp, &remote_rsvd_page_pa); + ret = xpc_get_remote_rp(nasid, NULL, remote_rp, &remote_rp_pa); if (ret != xpcSuccess) { dev_warn(xpc_part, "unable to get reserved page from nasid %d, " "which sent interrupt, reason=%d\n", nasid, ret); @@ -561,6 +643,10 @@ xpc_identify_act_IRQ_req(int nasid) } remote_vars_pa = remote_rp->vars_pa; + remote_rp_version = remote_rp->version; + if (XPC_SUPPORTS_RP_STAMP(remote_rp_version)) { + remote_rp_stamp = remote_rp->stamp; + } partid = remote_rp->partid; part = &xpc_partitions[partid]; @@ -586,44 +672,117 @@ xpc_identify_act_IRQ_req(int nasid) "%ld:0x%lx\n", (int) nasid, (int) partid, part->act_IRQ_rcvd, remote_vars->heartbeat, remote_vars->heartbeating_to_mask); + if (xpc_partition_disengaged(part) && + part->act_state == XPC_P_INACTIVE) { - if (part->act_state == XPC_P_INACTIVE) { + xpc_update_partition_info(part, remote_rp_version, + &remote_rp_stamp, remote_rp_pa, + remote_vars_pa, remote_vars); - part->remote_rp_pa = remote_rsvd_page_pa; - dev_dbg(xpc_part, " remote_rp_pa = 0x%016lx\n", - part->remote_rp_pa); + if (XPC_SUPPORTS_DISENGAGE_REQUEST(part->remote_vars_version)) { + if (xpc_partition_disengage_requested(1UL << partid)) { + /* + * Other side is waiting on us to disengage, + * even though we already have. + */ + return; + } + } else { + /* other side doesn't support disengage requests */ + xpc_clear_partition_disengage_request(1UL << partid); + } - part->remote_vars_pa = remote_vars_pa; - dev_dbg(xpc_part, " remote_vars_pa = 0x%016lx\n", - part->remote_vars_pa); + xpc_activate_partition(part); + return; + } - part->last_heartbeat = remote_vars->heartbeat; - dev_dbg(xpc_part, " last_heartbeat = 0x%016lx\n", - part->last_heartbeat); + DBUG_ON(part->remote_rp_version == 0); + DBUG_ON(part->remote_vars_version == 0); + + if (!XPC_SUPPORTS_RP_STAMP(part->remote_rp_version)) { + DBUG_ON(XPC_SUPPORTS_DISENGAGE_REQUEST(part-> + remote_vars_version)); + + if (!XPC_SUPPORTS_RP_STAMP(remote_rp_version)) { + DBUG_ON(XPC_SUPPORTS_DISENGAGE_REQUEST(remote_vars-> + version)); + /* see if the other side rebooted */ + if (part->remote_amos_page_pa == + remote_vars->amos_page_pa && + xpc_hb_allowed(sn_partition_id, + remote_vars)) { + /* doesn't look that way, so ignore the IPI */ + return; + } + } - part->remote_vars_part_pa = remote_vars->vars_part_pa; - dev_dbg(xpc_part, " remote_vars_part_pa = 0x%016lx\n", - part->remote_vars_part_pa); + /* + * Other side rebooted and previous XPC didn't support the + * disengage request, so we don't need to do anything special. + */ - part->remote_act_nasid = remote_vars->act_nasid; - dev_dbg(xpc_part, " remote_act_nasid = 0x%x\n", - part->remote_act_nasid); + xpc_update_partition_info(part, remote_rp_version, + &remote_rp_stamp, remote_rp_pa, + remote_vars_pa, remote_vars); + part->reactivate_nasid = nasid; + XPC_DEACTIVATE_PARTITION(part, xpcReactivating); + return; + } - part->remote_act_phys_cpuid = remote_vars->act_phys_cpuid; - dev_dbg(xpc_part, " remote_act_phys_cpuid = 0x%x\n", - part->remote_act_phys_cpuid); + DBUG_ON(!XPC_SUPPORTS_DISENGAGE_REQUEST(part->remote_vars_version)); - part->remote_amos_page_pa = remote_vars->amos_page_pa; - dev_dbg(xpc_part, " remote_amos_page_pa = 0x%lx\n", - part->remote_amos_page_pa); + if (!XPC_SUPPORTS_RP_STAMP(remote_rp_version)) { + DBUG_ON(!XPC_SUPPORTS_DISENGAGE_REQUEST(remote_vars->version)); - xpc_activate_partition(part); + /* + * Other side rebooted and previous XPC did support the + * disengage request, but the new one doesn't. + */ + + xpc_clear_partition_engaged(1UL << partid); + xpc_clear_partition_disengage_request(1UL << partid); - } else if (part->remote_amos_page_pa != remote_vars->amos_page_pa || - !XPC_HB_ALLOWED(sn_partition_id, remote_vars)) { + xpc_update_partition_info(part, remote_rp_version, + &remote_rp_stamp, remote_rp_pa, + remote_vars_pa, remote_vars); + reactivate = 1; + + } else { + DBUG_ON(!XPC_SUPPORTS_DISENGAGE_REQUEST(remote_vars->version)); + stamp_diff = xpc_compare_stamps(&part->remote_rp_stamp, + &remote_rp_stamp); + if (stamp_diff != 0) { + DBUG_ON(stamp_diff >= 0); + + /* + * Other side rebooted and the previous XPC did support + * the disengage request, as does the new one. + */ + + DBUG_ON(xpc_partition_engaged(1UL << partid)); + DBUG_ON(xpc_partition_disengage_requested(1UL << + partid)); + + xpc_update_partition_info(part, remote_rp_version, + &remote_rp_stamp, remote_rp_pa, + remote_vars_pa, remote_vars); + reactivate = 1; + } + } + + if (!xpc_partition_disengaged(part)) { + /* still waiting on other side to disengage from us */ + return; + } + + if (reactivate) { part->reactivate_nasid = nasid; XPC_DEACTIVATE_PARTITION(part, xpcReactivating); + + } else if (XPC_SUPPORTS_DISENGAGE_REQUEST(part->remote_vars_version) && + xpc_partition_disengage_requested(1UL << partid)) { + XPC_DEACTIVATE_PARTITION(part, xpcOtherGoingDown); } } @@ -643,14 +802,17 @@ xpc_identify_act_IRQ_sender(void) u64 nasid; /* remote nasid */ int n_IRQs_detected = 0; AMO_t *act_amos; - struct xpc_rsvd_page *rp = (struct xpc_rsvd_page *) xpc_rsvd_page; - act_amos = xpc_vars->act_amos; + act_amos = xpc_vars->amos_page + XPC_ACTIVATE_IRQ_AMOS; /* scan through act AMO variable looking for non-zero entries */ - for (word = 0; word < XP_NASID_MASK_WORDS; word++) { + for (word = 0; word < xp_nasid_mask_words; word++) { + + if (xpc_exiting) { + break; + } nasid_mask = xpc_IPI_receive(&act_amos[word]); if (nasid_mask == 0) { @@ -668,7 +830,7 @@ xpc_identify_act_IRQ_sender(void) * remote nasid in our reserved pages machine mask. * This is used in the event of module reload. */ - rp->mach_nasids[word] |= nasid_mask; + xpc_mach_nasids[word] |= nasid_mask; /* locate the nasid(s) which sent interrupts */ @@ -688,6 +850,55 @@ xpc_identify_act_IRQ_sender(void) /* + * See if the other side has responded to a partition disengage request + * from us. + */ +int +xpc_partition_disengaged(struct xpc_partition *part) +{ + partid_t partid = XPC_PARTID(part); + int disengaged; + + + disengaged = (xpc_partition_engaged(1UL << partid) == 0); + if (part->disengage_request_timeout) { + if (!disengaged) { + if (jiffies < part->disengage_request_timeout) { + /* timelimit hasn't been reached yet */ + return 0; + } + + /* + * Other side hasn't responded to our disengage + * request in a timely fashion, so assume it's dead. + */ + + xpc_clear_partition_engaged(1UL << partid); + disengaged = 1; + } + part->disengage_request_timeout = 0; + + /* cancel the timer function, provided it's not us */ + if (!in_interrupt()) { + del_singleshot_timer_sync(&part-> + disengage_request_timer); + } + + DBUG_ON(part->act_state != XPC_P_DEACTIVATING && + part->act_state != XPC_P_INACTIVE); + if (part->act_state != XPC_P_INACTIVE) { + xpc_wakeup_channel_mgr(part); + } + + if (XPC_SUPPORTS_DISENGAGE_REQUEST(part->remote_vars_version)) { + xpc_cancel_partition_disengage_request(part); + } + } + return disengaged; +} + + +/* * Mark specified partition as active. */ enum xpc_retval @@ -721,7 +932,6 @@ xpc_deactivate_partition(const int line, struct xpc_partition *part, enum xpc_retval reason) { unsigned long irq_flags; - partid_t partid = XPC_PARTID(part); spin_lock_irqsave(&part->act_lock, irq_flags); @@ -749,17 +959,27 @@ xpc_deactivate_partition(const int line, struct xpc_partition *part, spin_unlock_irqrestore(&part->act_lock, irq_flags); - XPC_DISALLOW_HB(partid, xpc_vars); + if (XPC_SUPPORTS_DISENGAGE_REQUEST(part->remote_vars_version)) { + xpc_request_partition_disengage(part); + xpc_IPI_send_disengage(part); - dev_dbg(xpc_part, "bringing partition %d down, reason = %d\n", partid, - reason); + /* set a timelimit on the disengage request */ + part->disengage_request_timeout = jiffies + + (xpc_disengage_request_timelimit * HZ); + part->disengage_request_timer.expires = + part->disengage_request_timeout; + add_timer(&part->disengage_request_timer); + } + + dev_dbg(xpc_part, "bringing partition %d down, reason = %d\n", + XPC_PARTID(part), reason); - xpc_partition_down(part, reason); + xpc_partition_going_down(part, reason); } /* - * Mark specified partition as active. + * Mark specified partition as inactive. */ void xpc_mark_partition_inactive(struct xpc_partition *part) @@ -792,9 +1012,10 @@ xpc_discovery(void) void *remote_rp_base; struct xpc_rsvd_page *remote_rp; struct xpc_vars *remote_vars; - u64 remote_rsvd_page_pa; + u64 remote_rp_pa; u64 remote_vars_pa; int region; + int region_size; int max_regions; int nasid; struct xpc_rsvd_page *rp; @@ -804,7 +1025,8 @@ xpc_discovery(void) enum xpc_retval ret; - remote_rp = xpc_kmalloc_cacheline_aligned(XPC_RSVD_PAGE_ALIGNED_SIZE, + remote_rp = xpc_kmalloc_cacheline_aligned(XPC_RP_HEADER_SIZE + + xp_nasid_mask_bytes, GFP_KERNEL, &remote_rp_base); if (remote_rp == NULL) { return; @@ -812,13 +1034,13 @@ xpc_discovery(void) remote_vars = (struct xpc_vars *) remote_rp; - discovered_nasids = kmalloc(sizeof(u64) * XP_NASID_MASK_WORDS, + discovered_nasids = kmalloc(sizeof(u64) * xp_nasid_mask_words, GFP_KERNEL); if (discovered_nasids == NULL) { kfree(remote_rp_base); return; } - memset(discovered_nasids, 0, sizeof(u64) * XP_NASID_MASK_WORDS); + memset(discovered_nasids, 0, sizeof(u64) * xp_nasid_mask_words); rp = (struct xpc_rsvd_page *) xpc_rsvd_page; @@ -827,11 +1049,19 @@ xpc_discovery(void) * nodes that can comprise an access protection grouping. The access * protection is in regards to memory, IOI and IPI. */ -//>>> move the next two #defines into either include/asm-ia64/sn/arch.h or -//>>> include/asm-ia64/sn/addrs.h -#define SH1_MAX_REGIONS 64 -#define SH2_MAX_REGIONS 256 - max_regions = is_shub2() ? SH2_MAX_REGIONS : SH1_MAX_REGIONS; + max_regions = 64; + region_size = sn_region_size; + + switch (region_size) { + case 128: + max_regions *= 2; + case 64: + max_regions *= 2; + case 32: + max_regions *= 2; + region_size = 16; + DBUG_ON(!is_shub2()); + } for (region = 0; region < max_regions; region++) { @@ -841,8 +1071,8 @@ xpc_discovery(void) dev_dbg(xpc_part, "searching region %d\n", region); - for (nasid = (region * sn_region_size * 2); - nasid < ((region + 1) * sn_region_size * 2); + for (nasid = (region * region_size * 2); + nasid < ((region + 1) * region_size * 2); nasid += 2) { if ((volatile int) xpc_exiting) { @@ -852,14 +1082,14 @@ xpc_discovery(void) dev_dbg(xpc_part, "checking nasid %d\n", nasid); - if (XPC_NASID_IN_ARRAY(nasid, rp->part_nasids)) { + if (XPC_NASID_IN_ARRAY(nasid, xpc_part_nasids)) { dev_dbg(xpc_part, "PROM indicates Nasid %d is " "part of the local partition; skipping " "region\n", nasid); break; } - if (!(XPC_NASID_IN_ARRAY(nasid, rp->mach_nasids))) { + if (!(XPC_NASID_IN_ARRAY(nasid, xpc_mach_nasids))) { dev_dbg(xpc_part, "PROM indicates Nasid %d was " "not on Numa-Link network at reset\n", nasid); @@ -877,7 +1107,7 @@ xpc_discovery(void) /* pull over the reserved page structure */ ret = xpc_get_remote_rp(nasid, discovered_nasids, - remote_rp, &remote_rsvd_page_pa); + remote_rp, &remote_rp_pa); if (ret != xpcSuccess) { dev_dbg(xpc_part, "unable to get reserved page " "from nasid %d, reason=%d\n", nasid, @@ -948,6 +1178,13 @@ xpc_discovery(void) remote_vars->act_nasid, remote_vars->act_phys_cpuid); + if (XPC_SUPPORTS_DISENGAGE_REQUEST(remote_vars-> + version)) { + part->remote_amos_page_pa = + remote_vars->amos_page_pa; + xpc_mark_partition_disengaged(part); + xpc_cancel_partition_disengage_request(part); + } xpc_IPI_send_activate(remote_vars); } } @@ -974,12 +1211,12 @@ xpc_initiate_partid_to_nasids(partid_t partid, void *nasid_mask) return xpcPartitionDown; } - part_nasid_pa = part->remote_rp_pa + - (u64) &((struct xpc_rsvd_page *) 0)->part_nasids; + memset(nasid_mask, 0, XP_NASID_MASK_BYTES); + + part_nasid_pa = (u64) XPC_RP_PART_NASIDS(part->remote_rp_pa); bte_res = xp_bte_copy(part_nasid_pa, ia64_tpa((u64) nasid_mask), - L1_CACHE_ALIGN(XP_NASID_MASK_BYTES), - (BTE_NOTIFY | BTE_WACQUIRE), NULL); + xp_nasid_mask_bytes, (BTE_NOTIFY | BTE_WACQUIRE), NULL); return xpc_map_bte_errors(bte_res); } diff --git a/include/asm-ia64/sn/xp.h b/include/asm-ia64/sn/xp.h index 75a2f39..49faf8f 100644 --- a/include/asm-ia64/sn/xp.h +++ b/include/asm-ia64/sn/xp.h @@ -217,7 +217,17 @@ enum xpc_retval { xpcInvalidPartid, /* 42: invalid partition ID */ xpcLocalPartid, /* 43: local partition ID */ - xpcUnknownReason /* 44: unknown reason -- must be last in list */ + xpcOtherGoingDown, /* 44: other side going down, reason unknown */ + xpcSystemGoingDown, /* 45: system is going down, reason unknown */ + xpcSystemHalt, /* 46: system is being halted */ + xpcSystemReboot, /* 47: system is being rebooted */ + xpcSystemPoweroff, /* 48: system is being powered off */ + + xpcDisconnecting, /* 49: channel disconnecting (closing) */ + + xpcOpenCloseError, /* 50: channel open/close protocol error */ + + xpcUnknownReason /* 51: unknown reason -- must be last in list */ }; @@ -342,7 +352,7 @@ typedef void (*xpc_notify_func)(enum xpc_retval reason, partid_t partid, * * The 'func' field points to the function to call when aynchronous * notification is required for such events as: a connection established/lost, - * or an incomming message received, or an error condition encountered. A + * or an incoming message received, or an error condition encountered. A * non-NULL 'func' field indicates that there is an active registration for * the channel. */ |