Add websocket handling for queue updates

This commit is contained in:
2021-07-26 07:23:17 -04:00
parent dbf21bc7bb
commit cf68a37616
6 changed files with 143 additions and 3 deletions

View File

@@ -1,5 +1,7 @@
import { NextPage, NextPageContext } from 'next';
import Link from 'next/link';
import React, { useEffect, useState } from 'react';
import Client from 'socket.io-client';
import Jobs from '../../src/react-components/Jobs';
@@ -16,6 +18,39 @@ type PageContext = NextPageContext & {
// react component
const Page: NextPage<PageProps> = ({ bookmarklet, jobs }) => {
const [_socket, setSocket] = useState<any>();
const [currentJobs, setCurrentJobs] = useState<any>(jobs);
useEffect(() => {
const newSocket = Client();
setSocket(newSocket);
newSocket.on('job.added', (payload: any) => {
setCurrentJobs((current: any[]) => {
return [payload.job, ...current.slice(0, -1)];
});
});
newSocket.on('job.updated', (payload: any) => {
setCurrentJobs((current: { id: any }[]) => {
const foundIndex = current.findIndex(
(cj: { id: any }) => cj.id == payload.job.id,
);
if (foundIndex > -1) {
current[foundIndex] = payload.job;
}
return [...current];
});
});
return () => {
newSocket.disconnect();
setSocket(undefined);
};
}, []);
return (
<>
<section className="section">
@@ -64,7 +99,7 @@ const Page: NextPage<PageProps> = ({ bookmarklet, jobs }) => {
<div className="container">
<h2 className="subtitle is-4 has-text-centered">Recent Downloads</h2>
<Jobs jobs={jobs} />
<Jobs jobs={currentJobs} />
</div>
</section>
</>

View File

@@ -1,10 +1,13 @@
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { ScheduleModule } from '@nestjs/schedule';
import { RenderModule } from 'nest-next';
import Next from 'next';
import configuration from './config/configuration';
import { JobGateway } from './job.gateway';
import { WebModule } from './web/web.module';
import { YtdlModule } from './ytdl/ytdl.module';
@@ -15,6 +18,10 @@ import { YtdlModule } from './ytdl/ytdl.module';
isGlobal: true,
load: [configuration],
}),
EventEmitterModule.forRoot({
wildcard: true,
}),
ScheduleModule.forRoot(),
RenderModule.forRootAsync(
Next({ dev: process.env.NODE_ENV !== 'production' }),
),
@@ -32,5 +39,6 @@ import { YtdlModule } from './ytdl/ytdl.module';
YtdlModule,
WebModule,
],
providers: [JobGateway],
})
export class AppModule {}

74
src/job.gateway.ts Normal file
View File

@@ -0,0 +1,74 @@
import { Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import {
OnGatewayConnection,
OnGatewayDisconnect,
WebSocketGateway,
WebSocketServer,
} from '@nestjs/websockets';
import { pick } from 'lodash';
import { Server, Socket } from 'socket.io';
import { JobEvent } from './types';
@WebSocketGateway()
export class JobGateway implements OnGatewayConnection, OnGatewayDisconnect {
private readonly logger = new Logger(JobGateway.name);
@WebSocketServer()
private server: Server | undefined;
private clients: Socket[] = [];
handleConnection(client: Socket) {
this.clients.push(client);
this.logger.verbose('handleConnection', client);
}
handleDisconnect(client: Socket) {
for (let i = 0; i < this.clients.length; i++) {
if (this.clients[i] === client) {
this.clients.splice(i, 1);
break;
}
}
this.logger.verbose('handleDisconnect', client);
}
private broadcast(event: string, message: any) {
const broadCastMessage = JSON.stringify(message);
for (const c of this.clients) {
c.send(event, broadCastMessage);
}
}
@OnEvent('job.added')
async sendAddedJob(payload: JobEvent) {
if (this.server && payload.job) {
const state = await payload.job.getState();
const progress = payload.job.progress();
payload.job = {
...pick(payload.job, ['id', 'name', 'data']),
progress: progress ? `${progress}%` : 'n/a',
state: state,
};
this.server.emit('job.added', payload);
}
}
@OnEvent('job.updated')
async sendJobUpdate(payload: JobEvent) {
if (this.server && payload.job) {
const state = await payload.job.getState();
const progress = payload.job.progress();
payload.job = {
...pick(payload.job, ['id', 'name', 'data']),
progress: progress ? `${progress}%` : 'n/a',
state: state,
};
this.server.emit('job.updated', payload);
}
}
}

View File

@@ -8,3 +8,8 @@ export class QueueDto {
title?: string;
extractor?: string;
}
export type JobEvent = {
job?: any;
err?: Error;
};

View File

@@ -12,11 +12,12 @@ import {
UsePipes,
ValidationPipe,
} from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Queue } from 'bull';
import { pick, trimEnd } from 'lodash';
import { YtdlService } from 'src/ytdl/ytdl.service';
import { QueueDto, UploadDto } from '../types';
import { YtdlService } from '../ytdl/ytdl.service';
@Controller()
export class WebController {
@@ -25,6 +26,7 @@ export class WebController {
constructor(
@InjectQueue('vidgrab') private readonly vidgrabQueue: Queue,
private readonly ytdlService: YtdlService,
private readonly eventEmitter: EventEmitter2,
) {}
@Get()
@@ -116,6 +118,8 @@ export class WebController {
const job = await this.vidgrabQueue.add('download', body);
this.eventEmitter.emit('job.added', { job });
return {
jobId: job.id,
};

View File

@@ -9,6 +9,7 @@ import {
} from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Job } from 'bull';
import { throttle } from 'lodash';
import split from 'split2';
@@ -22,7 +23,10 @@ export class YtdlProcessor {
job.progress(progress);
}, 250);
constructor(private readonly configService: ConfigService) {}
constructor(
private readonly configService: ConfigService,
private readonly eventEmitter: EventEmitter2,
) {}
@Process('download')
async handleDownload(job: Job<QueueDto>) {
@@ -66,6 +70,8 @@ export class YtdlProcessor {
job.data,
)}...`,
);
this.eventEmitter.emit('job.updated', { job });
}
@OnQueueCompleted()
@@ -75,6 +81,8 @@ export class YtdlProcessor {
job.data,
)} and result ${result}...`,
);
this.eventEmitter.emit('job.updated', { job });
}
@OnQueueFailed()
@@ -84,11 +92,15 @@ export class YtdlProcessor {
job.data,
)} and error ${err}...`,
);
this.eventEmitter.emit('job.failed', { job, error: err });
}
@OnQueueError()
onError(err: Error) {
this.logger.error(`Error is ${err}...`);
this.eventEmitter.emit('job.error', { error: err });
}
@OnQueueProgress()
@@ -98,5 +110,7 @@ export class YtdlProcessor {
job.data,
)} and progress ${progress}...`,
);
this.eventEmitter.emit('job.updated', { job });
}
}