AFTIPCTopologyWatcher.java

/*
 * junixsocket
 *
 * Copyright 2009-2024 Christian Kohlschütter
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.newsclub.net.unix.tipc;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.concurrent.atomic.AtomicBoolean;

import org.newsclub.net.unix.AFSocketType;
import org.newsclub.net.unix.AFTIPCSocketAddress;

import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;

/**
 * Provides access to the TIPC topology service.
 *
 * @author Christian Kohlschütter
 */
@SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
public class AFTIPCTopologyWatcher implements Closeable {
  private final int defaultTimeout;
  private final AFTIPCDatagramChannel channel;
  private final Selector selector;
  private final AtomicBoolean doLoop = new AtomicBoolean(false);
  private final AtomicBoolean running = new AtomicBoolean(false);

  /**
   * Creates an {@link AFTIPCTopologyWatcher} whose subscription requests do not time out by
   * default.
   *
   * @throws IOException on error.
   */
  public AFTIPCTopologyWatcher() throws IOException {
    this(AFTIPCTopologySubscription.TIPC_WAIT_FOREVER);
  }

  /**
   * Creates an {@link AFTIPCTopologyWatcher} whose subscription requests use the given default
   * timeout.
   *
   * @param defaultTimeoutSeconds The timeout in seconds (or
   *          {@link AFTIPCTopologySubscription#TIPC_WAIT_FOREVER};
   * @throws IOException on error.
   */
  public AFTIPCTopologyWatcher(int defaultTimeoutSeconds) throws IOException {
    this.defaultTimeout = defaultTimeoutSeconds;
    this.channel = AFTIPCDatagramSocket.newInstance(AFSocketType.SOCK_SEQPACKET).getChannel();
    this.selector = channel.provider().openSelector();
    channel.connect(AFTIPCSocketAddress.ofTopologyService());
    channel.configureBlocking(false);
  }

  @SuppressWarnings("all")
  @Deprecated
  protected final void finalize() {
  }

  /**
   * Watches for all port changes.
   *
   * @return The subscription object.
   * @throws IOException on error.
   * @see #cancelSubscription(AFTIPCTopologySubscription)
   */
  public final AFTIPCTopologySubscription addPortSubscription() throws IOException {
    return addPortSubscription(0, ~0);
  }

  /**
   * Watches for port changes of the given port ("port" meaning TIPC port, not TCP).
   *
   * @param port The port.
   * @return The subscription object.
   * @throws IOException on error.
   * @see #cancelSubscription(AFTIPCTopologySubscription)
   */
  public final AFTIPCTopologySubscription addPortSubscription(int port) throws IOException {
    return addPortSubscription(port, port);
  }

  /**
   * Watches for port changes within the given range ("port" meaning TIPC port, not TCP).
   *
   * @param lower The lower value of the port range.
   * @param upper The upper value of the port range.
   * @return The subscription object.
   * @throws IOException on error.
   * @see #cancelSubscription(AFTIPCTopologySubscription)
   */
  public final AFTIPCTopologySubscription addPortSubscription(int lower, int upper)
      throws IOException {
    return sendMessage(new AFTIPCTopologySubscription(AFTIPCTopologySubscription.TIPC_NODE_STATE,
        lower, upper, AFTIPCTopologySubscription.Flags.TIPC_SUB_PORTS, defaultTimeout,
        AFTIPCTopologySubscription.USR_EMPTY));
  }

  /**
   * Watches for all link state changes.
   *
   * @return The subscription object.
   * @throws IOException on error.
   * @see #cancelSubscription(AFTIPCTopologySubscription)
   */
  public final AFTIPCTopologySubscription addLinkStateSubscription() throws IOException {
    return sendMessage(new AFTIPCTopologySubscription(AFTIPCTopologySubscription.TIPC_LINK_STATE, 0,
        ~0, AFTIPCTopologySubscription.Flags.NONE, defaultTimeout,
        AFTIPCTopologySubscription.USR_EMPTY));
  }

  /**
   * Watches for service changes of the given service type, matching any instance.
   *
   * @param type The service type.
   * @return The subscription object.
   * @throws IOException on error.
   * @see #cancelSubscription(AFTIPCTopologySubscription)
   */
  public final AFTIPCTopologySubscription addServiceSubscription(int type) throws IOException {
    return addServiceSubscription(type, 0, ~0);
  }

  /**
   * Watches for service changes of the given service type, matching only the specified instance.
   *
   * @param type The service type.
   * @param instance The instance to match.
   * @return The subscription object.
   * @throws IOException on error.
   * @see #cancelSubscription(AFTIPCTopologySubscription)
   */
  public final AFTIPCTopologySubscription addServiceSubscription(int type, int instance)
      throws IOException {
    return addServiceSubscription(type, instance, instance);
  }

  /**
   * Watches for service changes of the given service type and instance range.
   *
   * @param type The service type.
   * @param lower The lower value of the instance range.
   * @param upper The upper value of the instance range.
   * @return The subscription object.
   * @throws IOException on error.
   * @see #cancelSubscription(AFTIPCTopologySubscription)
   */
  public final AFTIPCTopologySubscription addServiceSubscription(int type, int lower, int upper)
      throws IOException {
    return sendMessage(new AFTIPCTopologySubscription(type, lower, upper,
        AFTIPCTopologySubscription.Flags.TIPC_SUB_SERVICE, defaultTimeout,
        AFTIPCTopologySubscription.USR_EMPTY));
  }

  /**
   * Cancels a previously added service subscription.
   *
   * @param sub The subscription to cancel.
   * @throws IOException on error.
   */
  public final void cancelSubscription(AFTIPCTopologySubscription sub) throws IOException {
    sendMessage(sub.toCancellation());
  }

  /**
   * Sends a manually crafted subscription message to the TIPC topology server. You usually don't
   * need to do this directly; use the
   * {@code #addPortSubscription(int, int)}/{@link #cancelSubscription(AFTIPCTopologySubscription)}
   * methods instead.
   *
   * @param sub The subscription message.
   * @return The very message.
   * @throws IOException on error.
   */
  public final AFTIPCTopologySubscription sendMessage(AFTIPCTopologySubscription sub)
      throws IOException {
    channel.write(sub.toBuffer());
    return sub;
  }

  /**
   * Runs a receive loop until {@link #stopLoop()} or {@link #close()} is called.
   *
   * This method returns after the run loop terminates.
   *
   * @throws IOException on error.
   */
  @SuppressWarnings("null")
  public final void runLoop() throws IOException {
    if (!running.compareAndSet(false, true)) {
      throw new IllegalStateException("Already running");
    }

    ByteBuffer buf = ByteBuffer.allocate(64);
    SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

    try {
      doLoop.set(true);
      while (!Thread.interrupted() && doLoop.get()) {
        int n = selector.select();
        if (!key.isValid() || !doLoop.get()) {
          break;
        }
        if (n > 0) {
          channel.receive(buf);
          @SuppressWarnings("cast")
          AFTIPCTopologyEvent event = AFTIPCTopologyEvent.readFromBuffer((ByteBuffer) buf.flip());
          onEvent(event);
          buf.clear();
        }
      }
    } finally {
      key.cancel();
      running.set(false);
    }
  }

  /**
   * Called for every event encountered by the run loop.
   *
   * @param event The event.
   * @throws IOException on error. Any exception will terminate the run loop.
   * @see #runLoop()
   */
  protected void onEvent(AFTIPCTopologyEvent event) throws IOException {
  }

  /**
   * Called upon {@link #close()}.
   *
   * @throws IOException on error.
   */
  protected void onClose() throws IOException {
  }

  /**
   * Checks if the watcher run loop is running.
   *
   * @return {@code true} if running.
   * @see #runLoop()
   */
  public boolean isRunning() {
    return running.get();
  }

  /**
   * Stops the run loop.
   */
  public final void stopLoop() {
    doLoop.set(false);
    selector.wakeup();
  }

  /**
   * Closes this instance.
   */
  @Override
  public final void close() throws IOException {
    channel.close();
    stopLoop();
    onClose();
  }
}