AFTIPCTopologySubscription.java
/*
* junixsocket
*
* Copyright 2009-2024 Christian Kohlschütter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.newsclub.net.unix.tipc;
import static org.newsclub.net.unix.AFTIPCSocketAddress.AddressType.formatTIPCInt;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.WeakHashMap;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.newsclub.net.unix.NamedIntegerBitmask;
/**
* An "event subscription" to be used with {@link AFTIPCTopologyWatcher}.
*
* @author Christian Kohlschütter
*/
@NonNullByDefault
public final class AFTIPCTopologySubscription {
static final byte[] USR_EMPTY = new byte[8];
private static final WeakHashMap<ByteBuffer, // NOPMD.LooseCoupling
@Nullable AFTIPCTopologySubscription> SUBSCRIPTIONS = new WeakHashMap<>();
/**
* Special timeout value meaning "never timeout".
*/
public static final int TIPC_WAIT_FOREVER = -1;
/**
* The "node state" cluster topology monitor mode.
*
* When TIPC establishes contact with another node, it does internally create a binding {type =
* TIPC_NODE_STATE, instance = peer node hash number} in the binding table. This makes it possible
* for applications on a node to keep track of reachable peer nodes at any time. *
*/
public static final int TIPC_NODE_STATE = 0;
/**
* The "link state" cluster connectivity monitor mode.
*
* When TIPC establishes a new link to another node, it does internally create a binding {type =
* TIPC_LINK_STATE, instance = peer node hash number} in the binding table. This makes it possible
* for applications on a node to keep track of working links to peer nodes at any time.
*
* This type of binding differs from the topology subscription binding ({@link #TIPC_NODE_STATE})
* in that there may be two links, and hence two bindings, to keep track of for each peer node.
* Although this binding type only is published with node visibility, it is possible to combine it
* with remote node topology subscriptions to obtain a full and continuous matrix view of the
* connectivity in the cluster.
*/
public static final int TIPC_LINK_STATE = 2;
private static final int MESSAGE_LENGTH = 28;
private final int type;
private final int lower;
private final int upper;
private final Flags flags;
private final int timeout;
private final byte[] usrHandle;
/**
* Some flags used in the subscription request.
*/
public static final class Flags extends NamedIntegerBitmask<Flags> {
private static final long serialVersionUID = 1L;
/**
* No flags set.
*/
public static final Flags NONE = new Flags("NONE", 0);
/**
* Event at each match.
*/
public static final Flags TIPC_SUB_PORTS;
/**
* Event at first up/last down.
*/
public static final Flags TIPC_SUB_SERVICE;
/**
* Cancel a subscription.
*/
public static final Flags TIPC_SUB_CANCEL;
private static final @NonNull Flags[] VALUES = {
TIPC_SUB_PORTS = new Flags("TIPC_SUB_PORTS", 1), //
TIPC_SUB_SERVICE = new Flags("TIPC_SUB_SERVICE", 2), //
TIPC_SUB_CANCEL = new Flags("TIPC_SUB_CANCEL", 4), //
};
private Flags(@Nullable String name, int flags) {
super(name, flags);
}
/**
* Returns a {@link Flags} instance given an integer value.
*
* @param v The value.
* @return The instance.
*/
public static Flags ofValue(int v) {
return resolve(VALUES, NONE, Flags::new, v);
}
/**
* Returns a {@link Flags} instance representing the combination of the given list of
* {@link Flags} flags.
*
* @param flags The flags (zero or more values).
* @return The instance.
*/
public static Flags withFlags(@NonNull Flags... flags) {
return resolve(VALUES, NONE, Flags::new, flags);
}
/**
* Combines the given {@link Flags} instance with another one.
*
* @param other The other instance.
* @return The combined instance.
*/
@Override
public Flags combineWith(Flags other) {
return combineWith(VALUES, NONE, Flags::new, other);
}
}
/**
* Creates a new subscription message that does not time out.
*
* @param type The service type (any service, particularly {@link #TIPC_NODE_STATE} and
* {@link #TIPC_LINK_STATE}.
* @param lower The lower instance.
* @param upper The upper instance.
* @param flags Any flaas (use {@link Flags#NONE} if you don't have any).
*/
public AFTIPCTopologySubscription(int type, int lower, int upper, Flags flags) {
this(type, lower, upper, flags, TIPC_WAIT_FOREVER, USR_EMPTY);
}
/**
* Creates a new subscription message.
*
* @param type The service type (any service, particularly {@link #TIPC_NODE_STATE} and
* {@link #TIPC_LINK_STATE}.
* @param lower The lower instance.
* @param upper The upper instance.
* @param flags Any flaas (use {@link Flags#NONE} if you don't have any).
* @param timeoutSeconds The timeout (in seconds), or {@value #TIPC_WAIT_FOREVER} if this should
* never time out.
*/
public AFTIPCTopologySubscription(int type, int lower, int upper, Flags flags,
int timeoutSeconds) {
this(type, lower, upper, flags, timeoutSeconds, USR_EMPTY);
}
/**
* Creates a new subscription message.
*
* @param type The service type (any service, particularly {@link #TIPC_NODE_STATE} and
* {@link #TIPC_LINK_STATE}.
* @param lower The lower instance.
* @param upper The upper instance.
* @param flags Any flaas (use {@link Flags#NONE} if you don't have any).
* @param timeoutSeconds The timeout (in seconds), or {@value #TIPC_WAIT_FOREVER} if this should
* never time out.
* @param usrHandle A custom 8-byte message that is included in {@link AFTIPCTopologyEvent}
* messages.
*/
public AFTIPCTopologySubscription(int type, int lower, int upper, Flags flags, int timeoutSeconds,
byte[] usrHandle) {
this.type = type;
this.lower = lower;
this.upper = upper;
this.flags = flags == null ? Flags.NONE : flags;
this.timeout = timeoutSeconds;
this.usrHandle = new byte[8];
if (usrHandle != null) {
if (usrHandle.length > 8) {
throw new IllegalArgumentException("User handle too long");
} else {
System.arraycopy(usrHandle, 0, this.usrHandle, 0, usrHandle.length);
}
}
}
@SuppressWarnings({"null", "cast"})
static AFTIPCTopologySubscription readFromBuffer(ByteBuffer buf) {
buf = (ByteBuffer) buf.slice().limit(MESSAGE_LENGTH);
AFTIPCTopologySubscription sub = SUBSCRIPTIONS.get(buf);
if (sub != null) {
return sub;
}
int type = buf.getInt();
int lower = buf.getInt();
int upper = buf.getInt();
int timeout = buf.getInt();
Flags flags = Flags.ofValue(buf.getInt());
byte[] usrHandle = new byte[8];
buf.get(usrHandle, 0, 8);
buf.flip();
sub = new AFTIPCTopologySubscription(type, lower, upper, flags, timeout, usrHandle);
SUBSCRIPTIONS.put(buf, sub);
return sub;
}
@SuppressWarnings("null")
ByteBuffer writeToBuffer(ByteBuffer buf) {
buf = buf.order(ByteOrder.BIG_ENDIAN);
buf.putInt(type);
buf.putInt(lower);
buf.putInt(upper);
buf.putInt(timeout);
buf.putInt(flags.value());
buf.put(usrHandle);
return buf;
}
/**
* Converts this subscription message to a new {@link ByteBuffer}.
*
* @return The new buffer, ready to read from.
*/
@SuppressWarnings({"null", "cast"})
public ByteBuffer toBuffer() {
return (ByteBuffer) writeToBuffer(ByteBuffer.allocate(MESSAGE_LENGTH)).flip();
}
@Override
public String toString() {
return super.toString() + "[" + type + "@" + formatTIPCInt(lower) + "-" + formatTIPCInt(upper)
+ ";flags=" + flags + ";timeout=" + timeout + ";usrHandle=" + String.format(Locale.ENGLISH,
"%16s", new BigInteger(1, usrHandle).toString(16)).replace(' ', '0') + "]";
}
/**
* Creates an {@link AFTIPCTopologySubscription} that cancels this subscription.
*
* Note that a cancellation cannot be cancelled again.
*
* @return The new {@link AFTIPCTopologySubscription}.
*/
public AFTIPCTopologySubscription toCancellation() {
return new AFTIPCTopologySubscription(type, lower, upper, Flags.TIPC_SUB_CANCEL, timeout,
usrHandle);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + Arrays.hashCode(usrHandle);
result = prime * result + Objects.hash(flags, lower, timeout, type, upper);
return result;
}
@Override
public boolean equals(@Nullable Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof AFTIPCTopologySubscription)) {
return false;
}
AFTIPCTopologySubscription other = (AFTIPCTopologySubscription) obj;
return Objects.equals(flags, other.flags) && lower == other.lower && timeout == other.timeout
&& type == other.type && upper == other.upper && Arrays.equals(usrHandle, other.usrHandle);
}
/**
* Returns the service type.
*
* @return The type.
*/
public int getType() {
return type;
}
/**
* Returns the lower instance value.
*
* @return The lower instance value.
*/
public int getLower() {
return lower;
}
/**
* Returns the upper instance value.
*
* @return The upper instance value.
*/
public int getUpper() {
return upper;
}
/**
* Returns the flags.
*
* @return The flags.
*/
public Flags getFlags() {
return flags;
}
/**
* Returns the timeout, in seconds (or {@link #TIPC_WAIT_FOREVER} for "never timeout").
*
* @return The timeout.
*/
public int getTimeout() {
return timeout;
}
/**
* Returns the 8-byte user handle.
*
* @return The user handle.
*/
public byte[] getUsrHandle() {
return usrHandle.clone();
}
/**
* Returns {@code true} iff the subscription has the
* {@link AFTIPCTopologySubscription.Flags#TIPC_SUB_PORTS} flag set.
*
* @return {@code true} if this is a "port" subscription.
*/
public boolean isPort() {
return flags.hasFlag(Flags.TIPC_SUB_PORTS);
}
/**
* Returns {@code true} iff the subscription has the
* {@link AFTIPCTopologySubscription.Flags#TIPC_SUB_SERVICE} flag set.
*
* @return {@code true} if this is a "service" subscription.
*/
public boolean isService() {
return flags.hasFlag(Flags.TIPC_SUB_SERVICE);
}
/**
* Returns {@code true} iff the subscription has the
* {@link AFTIPCTopologySubscription.Flags#TIPC_SUB_CANCEL} flag set.
*
* @return {@code true} if this is a "cancellation" subscription request.
*/
public boolean isCancellation() {
return flags.hasFlag(Flags.TIPC_SUB_CANCEL);
}
}