Miałem ten sam problem, nie byłem pewien, czy znalazłeś rozwiązanie, czy nie, ale udało mi się osiągnąć coś podobnego, wykonując następujące czynności. Najpierw dodałem wyzwalacz do mojego stołu
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
Spowoduje to ustawienie wyzwalacza w tabeli za każdym razem, gdy wiersz, zostanie zaktualizowany, usunięty lub wstawiony. Następnie wywoła skonfigurowaną przeze mnie funkcję wyzwalacza, która wygląda mniej więcej tak:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
Umożliwi mi to „słuchanie” każdej z tych aktualizacji z mojego projektu Spring Boot i wyśle cały wiersz jako ładunek. Następnie w moim projekcie Spring Boot skonfigurowałem połączenie z moją bazą danych.
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
Z tym Autowire (wstrzykiwanie zależności) go do konstruktora w mojej klasie usług i rzucam go do klasy PostgressqlConnection r2dbc w następujący sposób:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
Teraz chcemy "nasłuchiwać" naszej tabeli i otrzymywać powiadomienie, gdy wykonamy jakąś aktualizację naszej tabeli. W tym celu konfigurujemy metodę inicjalizacji, która jest wykonywana po wstrzyknięciu zależności za pomocą adnotacji @PostContruct
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
Zauważ, że słuchamy dowolnej nazwy, którą umieściliśmy w metodzie pg_notify. Chcemy również skonfigurować metodę zamykania połączenia, gdy fasola ma zostać wyrzucona, na przykład:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
Teraz po prostu tworzę metodę, która zwraca strumień tego, co jest obecnie w mojej tabeli, a także scalam go z moimi powiadomieniami, jak powiedziałem, zanim powiadomienia przyjdą jako json, więc musiałem go zdeserializować i postanowiłem użyć ObjectMapper. Będzie to więc wyglądało mniej więcej tak:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
Mam nadzieję, że to pomoże. Na zdrowie!