1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
/*
* Copyright (c) 2021 Cisco and/or its affiliates.
*/
#include "subscription.h"
#include <hicn/util/vector.h>
#include <hicn/util/log.h>
/*----------------------------------------------------------------------------*
* Topics and events
*----------------------------------------------------------------------------*/
bool topics_contains(hc_topics_t topic_list, hc_topic_t topic) {
return ((topic_list) & (topic));
}
#define topic_is_set(topic_list, topic_index) \
((topic_list) & (1 << (topic_index)))
/*----------------------------------------------------------------------------*
* Subscriptions
*----------------------------------------------------------------------------*/
struct subscription_table_s {
unsigned *table[TOPIC_N];
};
subscription_table_t *subscription_table_create() {
subscription_table_t *subscriptions = malloc(sizeof(subscription_table_t));
for (int i = 0; i < NUM_TOPICS; i++)
vector_init(subscriptions->table[i], 0, 0);
return subscriptions;
}
void subscription_table_free(subscription_table_t *subscriptions) {
for (int i = 0; i < NUM_TOPICS; i++) vector_free(subscriptions->table[i]);
free(subscriptions);
}
int subscription_table_add_topics_for_connection(
subscription_table_t *subscriptions, hc_topics_t topics,
unsigned connection_id) {
bool is_subscription_already_present = false;
for (int topic_index = 0; topic_index < NUM_TOPICS; topic_index++) {
if (topic_is_set(topics, topic_index)) {
int num_duplicates = vector_remove_unordered(
subscriptions->table[topic_index], connection_id);
int ret = vector_push(subscriptions->table[topic_index], connection_id);
if (ret < 0) {
ERROR("Unable to perform subscription for connection %d, topic %s",
connection_id, object_type_str(topic_index));
return -1;
}
if (num_duplicates > 0) {
DEBUG("Connection %d had already a subscription for topic %s",
connection_id, object_type_str(topic_index));
is_subscription_already_present = true;
}
}
}
return is_subscription_already_present ? -2 : 0;
}
int subscription_table_remove_topics_for_connection(
subscription_table_t *subscriptions, hc_topics_t topics,
unsigned connection_id) {
int num_subscriptions_removed = 0;
for (int topic_index = 0; topic_index < NUM_TOPICS; topic_index++) {
if (topic_is_set(topics, topic_index)) {
int num_duplicates = vector_remove_unordered(
subscriptions->table[topic_index], connection_id);
if (num_duplicates <= 0) {
continue;
}
num_subscriptions_removed++;
}
}
return num_subscriptions_removed;
}
hc_topics_t subscription_table_get_topics_for_connection(
subscription_table_t *subscriptions, unsigned connection_id) {
hc_topics_t topics = 0;
for (int topic_index = 0; topic_index < NUM_TOPICS; topic_index++) {
unsigned *conn_id;
bool found = false;
vector_foreach(subscriptions->table[topic_index], conn_id, {
if (*conn_id == connection_id) {
found = true;
break;
}
});
if (found) topics |= (1 << topic_index);
}
return topics;
}
unsigned *subscription_table_get_connections_for_topic(
subscription_table_t *subscriptions, hc_topic_t topic) {
int topic_index = object_from_topic(topic);
return subscriptions->table[topic_index];
}
void subscription_table_print(subscription_table_t *subscriptions) {
for (int topic_index = OBJECT_TYPE_UNDEFINED + 1; topic_index < NUM_TOPICS;
topic_index++) {
printf("topic %s (%lu subscription/s) from connection/s: [ ",
object_type_str(topic_index),
(unsigned long)vector_len(subscriptions->table[topic_index]));
unsigned *connection_id;
vector_foreach(subscriptions->table[topic_index], connection_id,
{ printf("%d ", *connection_id); });
printf("]\n");
}
}
|