phc2sys: event subscription

Add support for subscribing to events (run_pmc_subscribe) and receiving and
handling of received events (run_pmc_events).

Add initial support for port status changes.

Signed-off-by: Jiri Benc <jbenc@redhat.com>
master
Jiri Benc 2014-06-11 21:35:19 +02:00 committed by Richard Cochran
parent 48f6a31cc3
commit 175ca678e7
1 changed files with 114 additions and 2 deletions

116
phc2sys.c
View File

@ -41,6 +41,7 @@
#include "ds.h" #include "ds.h"
#include "fsm.h" #include "fsm.h"
#include "missing.h" #include "missing.h"
#include "notification.h"
#include "phc.h" #include "phc.h"
#include "pi.h" #include "pi.h"
#include "pmc_common.h" #include "pmc_common.h"
@ -59,12 +60,19 @@
#define PHC_PPS_OFFSET_LIMIT 10000000 #define PHC_PPS_OFFSET_LIMIT 10000000
#define PMC_UPDATE_INTERVAL (60 * NS_PER_SEC) #define PMC_UPDATE_INTERVAL (60 * NS_PER_SEC)
#define PMC_SUBSCRIBE_DURATION 180 /* 3 minutes */
/* Note that PMC_SUBSCRIBE_DURATION has to be longer than
* PMC_UPDATE_INTERVAL otherwise subscription will time out before it is
* renewed.
*/
struct clock { struct clock {
LIST_ENTRY(clock) list; LIST_ENTRY(clock) list;
clockid_t clkid; clockid_t clkid;
int sysoff_supported; int sysoff_supported;
int is_utc; int is_utc;
int state;
int new_state;
struct servo *servo; struct servo *servo;
enum servo_state servo_state; enum servo_state servo_state;
char *device; char *device;
@ -78,6 +86,7 @@ struct clock {
struct port { struct port {
LIST_ENTRY(port) list; LIST_ENTRY(port) list;
unsigned int number; unsigned int number;
int state;
struct clock *clock; struct clock *clock;
}; };
@ -95,6 +104,7 @@ struct node {
struct pmc *pmc; struct pmc *pmc;
int pmc_ds_requested; int pmc_ds_requested;
uint64_t pmc_last_update; uint64_t pmc_last_update;
int state_changed;
LIST_HEAD(port_head, port) ports; LIST_HEAD(port_head, port) ports;
LIST_HEAD(clock_head, clock) clocks; LIST_HEAD(clock_head, clock) clocks;
struct clock *master; struct clock *master;
@ -533,6 +543,81 @@ static void *get_mgt_data(struct ptp_message *msg)
return mgt->data; return mgt->data;
} }
static int normalize_state(int state)
{
if (state != PS_MASTER && state != PS_SLAVE &&
state != PS_PRE_MASTER && state != PS_UNCALIBRATED) {
/* treat any other state as "not a master nor a slave" */
state = PS_DISABLED;
}
return state;
}
static int clock_compute_state(struct node *node, struct clock *clock)
{
struct port *p;
int state = PS_DISABLED;
LIST_FOREACH(p, &node->ports, list) {
if (p->clock != clock)
continue;
/* PS_SLAVE takes the highest precedence, PS_UNCALIBRATED
* after that, PS_MASTER is third, PS_PRE_MASTER fourth and
* all of that overrides PS_DISABLED, which corresponds
* nicely with the numerical values */
if (p->state > state)
state = p->state;
}
return state;
}
static int recv_subscribed(struct node *node, struct ptp_message *msg,
int excluded)
{
int mgt_id, state;
struct portDS *pds;
struct port *port;
struct clock *clock;
mgt_id = get_mgt_id(msg);
if (mgt_id == excluded)
return 0;
switch (mgt_id) {
case PORT_DATA_SET:
pds = get_mgt_data(msg);
port = port_get(node, pds->portIdentity.portNumber);
if (!port) {
pr_info("received data for unknown port %s",
pid2str(&pds->portIdentity));
return 1;
}
state = normalize_state(pds->portState);
if (port->state != state) {
pr_info("port %s changed state",
pid2str(&pds->portIdentity));
port->state = state;
clock = port->clock;
state = clock_compute_state(node, clock);
if (clock->state != state) {
clock->new_state = state;
node->state_changed = 1;
}
}
return 1;
}
return 0;
}
static void send_subscription(struct node *node)
{
struct subscribe_events_np sen;
memset(&sen, 0, sizeof(sen));
sen.duration = PMC_SUBSCRIBE_DURATION;
sen.bitmask[0] = 1 << NOTIFY_PORT_STATE;
pmc_send_set_action(node->pmc, SUBSCRIBE_EVENTS_NP, &sen, sizeof(sen));
}
static int init_pmc(struct node *node, int domain_number) static int init_pmc(struct node *node, int domain_number)
{ {
node->pmc = pmc_create(TRANS_UDS, "/var/run/phc2sys", 0, node->pmc = pmc_create(TRANS_UDS, "/var/run/phc2sys", 0,
@ -555,7 +640,7 @@ static int run_pmc(struct node *node, int timeout, int ds_id,
while (1) { while (1) {
pollfd[0].fd = pmc_get_transport_fd(node->pmc); pollfd[0].fd = pmc_get_transport_fd(node->pmc);
pollfd[0].events = POLLIN|POLLPRI; pollfd[0].events = POLLIN|POLLPRI;
if (!node->pmc_ds_requested) if (!node->pmc_ds_requested && ds_id >= 0)
pollfd[0].events |= POLLOUT; pollfd[0].events |= POLLOUT;
cnt = poll(pollfd, N_FD, timeout); cnt = poll(pollfd, N_FD, timeout);
@ -572,7 +657,14 @@ static int run_pmc(struct node *node, int timeout, int ds_id,
/* Send a new request if there are no pending messages. */ /* Send a new request if there are no pending messages. */
if ((pollfd[0].revents & POLLOUT) && if ((pollfd[0].revents & POLLOUT) &&
!(pollfd[0].revents & (POLLIN|POLLPRI))) { !(pollfd[0].revents & (POLLIN|POLLPRI))) {
pmc_send_get_action(node->pmc, ds_id); switch (ds_id) {
case SUBSCRIBE_EVENTS_NP:
send_subscription(node);
break;
default:
pmc_send_get_action(node->pmc, ds_id);
break;
}
node->pmc_ds_requested = 1; node->pmc_ds_requested = 1;
} }
@ -585,6 +677,7 @@ static int run_pmc(struct node *node, int timeout, int ds_id,
continue; continue;
if (!is_msg_mgt(*msg) || if (!is_msg_mgt(*msg) ||
recv_subscribed(node, *msg, ds_id) ||
get_mgt_id(*msg) != ds_id) { get_mgt_id(*msg) != ds_id) {
msg_put(*msg); msg_put(*msg);
*msg = NULL; *msg = NULL;
@ -645,6 +738,25 @@ static int run_pmc_get_utc_offset(struct node *node, int timeout)
return 1; return 1;
} }
static int run_pmc_subscribe(struct node *node, int timeout)
{
struct ptp_message *msg;
int res;
res = run_pmc(node, timeout, SUBSCRIBE_EVENTS_NP, &msg);
if (res <= 0)
return res;
msg_put(msg);
return 1;
}
static void run_pmc_events(struct node *node)
{
struct ptp_message *msg;
run_pmc(node, 0, -1, &msg);
}
static void close_pmc(struct node *node) static void close_pmc(struct node *node)
{ {
pmc_destroy(node->pmc); pmc_destroy(node->pmc);