import { loginSelectors } from '@Mesh/store/login/login.selectors';
import { AuthenticationService } from '@Mesh/core/services/authentication.service';
import { Injectable } from '@angular/core';
import { MESSAGING_WEBSOCKET_URL } from '@Env/environment';
import { Subject } from 'rxjs';
import { RSocketClient, JsonSerializer, IdentitySerializer } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
import { Store } from '@ngrx/store';
import { AppState } from '@Mesh/store/app.state';

@Injectable({
  providedIn: 'root',
})
export class WebsocketService {
  public messagesSubject$ = new Subject<any>();
  private token;
  constructor(private authService: AuthenticationService, private store: Store<AppState>) {
    this.store.select(loginSelectors.selectToken).subscribe((token) => {
      this.token = token;
    });
  }

  connect() {
    const client = new RSocketClient({
      serializers: {
        data: JsonSerializer,
        metadata: IdentitySerializer,
      },
      setup: {
        // ms btw sending keepalive to server
        keepAlive: 60000,
        // ms timeout if no keepalive response
        lifetime: 180000,
        // format of `data`
        dataMimeType: 'application/json',
        // format of `metadata`
        metadataMimeType: 'message/x.rsocket.routing.v0',
      },
      transport: new RSocketWebSocketClient({
        url: MESSAGING_WEBSOCKET_URL,
      }),
    });

    client.connect().subscribe({
      onComplete: (socket) => {
        socket.connectionStatus().subscribe((event) => console.log(event));
        // socket provides the rsocket interactions fire/forget, request/response,
        // request/stream, etc as well as methods to close the socket.
        socket
          .requestStream({
            data: {
              token: this.token,
            },
            metadata: String.fromCharCode('get.manage.events.route'.length) + 'get.manage.events.route',
          })
          .subscribe({
            onComplete: () => console.log('Request-stream completed'),
            onNext: (payload) => {
              this.messagesSubject$.next(payload.data);
            },
            onSubscribe: (subscription) => {
              subscription.request(2147483647);
            },
            onError: (error) => console.error(`Request-stream error:${error.message}`),
          });
      },
      onError: (error) => console.error(`Request-stream error:${error.message}`),
    });
  }
}
