split MessageAggregator in EventAggregator and CommandExecutor

This commit is contained in:
kay.one
2013-09-13 23:36:07 -07:00
parent 909439f615
commit 64181ebdff
83 changed files with 296 additions and 233 deletions
@@ -10,6 +10,7 @@ using NzbDrone.Common.EnvironmentInfo;
using NzbDrone.Core.Configuration.Events;
using NzbDrone.Core.Lifecycle;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Configuration
@@ -33,15 +34,15 @@ namespace NzbDrone.Core.Configuration
{
private const string CONFIG_ELEMENT_NAME = "Config";
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly ICached<string> _cache;
private readonly string _configFile;
public ConfigFileProvider(IAppFolderInfo appFolderInfo, ICacheManger cacheManger, IMessageAggregator messageAggregator)
public ConfigFileProvider(IAppFolderInfo appFolderInfo, ICacheManger cacheManger, IEventAggregator eventAggregator)
{
_cache = cacheManger.GetCache<string>(GetType());
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_configFile = appFolderInfo.GetConfigPath();
}
@@ -82,7 +83,7 @@ namespace NzbDrone.Core.Configuration
}
}
_messageAggregator.PublishEvent(new ConfigFileSavedEvent());
_eventAggregator.PublishEvent(new ConfigFileSavedEvent());
}
public int Port
@@ -1,6 +1,7 @@
using System.Linq;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Configuration
@@ -13,8 +14,8 @@ namespace NzbDrone.Core.Configuration
public class ConfigRepository : BasicRepository<Config>, IConfigRepository
{
public ConfigRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public ConfigRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
+5 -4
View File
@@ -7,6 +7,7 @@ using NzbDrone.Core.Download;
using NzbDrone.Core.Download.Clients.Nzbget;
using NzbDrone.Core.Download.Clients.Sabnzbd;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Configuration
@@ -19,14 +20,14 @@ namespace NzbDrone.Core.Configuration
public class ConfigService : IConfigService
{
private readonly IConfigRepository _repository;
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly Logger _logger;
private static Dictionary<string, string> _cache;
public ConfigService(IConfigRepository repository, IMessageAggregator messageAggregator, Logger logger)
public ConfigService(IConfigRepository repository, IEventAggregator eventAggregator, Logger logger)
{
_repository = repository;
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_logger = logger;
_cache = new Dictionary<string, string>();
}
@@ -69,7 +70,7 @@ namespace NzbDrone.Core.Configuration
SetValue(configValue.Key, configValue.Value.ToString());
}
_messageAggregator.PublishEvent(new ConfigSavedEvent());
_eventAggregator.PublishEvent(new ConfigSavedEvent());
}
public String SabHost
@@ -1,5 +1,6 @@
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.DataAugmentation.Scene
@@ -11,8 +12,8 @@ namespace NzbDrone.Core.DataAugmentation.Scene
public class SceneMappingRepository : BasicRepository<SceneMapping>, ISceneMappingRepository
{
public SceneMappingRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public SceneMappingRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
@@ -4,6 +4,8 @@ using NLog;
using NzbDrone.Common.Cache;
using NzbDrone.Core.Lifecycle;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Parser;
namespace NzbDrone.Core.DataAugmentation.Scene
+5 -4
View File
@@ -7,6 +7,7 @@ using Marr.Data.QGen;
using NzbDrone.Core.Datastore.Events;
using NzbDrone.Common;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Datastore
@@ -38,17 +39,17 @@ namespace NzbDrone.Core.Datastore
public class BasicRepository<TModel> : IBasicRepository<TModel> where TModel : ModelBase, new()
{
private readonly IDatabase _database;
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private IDataMapper DataMapper
{
get { return _database.GetDataMapper(); }
}
public BasicRepository(IDatabase database, IMessageAggregator messageAggregator)
public BasicRepository(IDatabase database, IEventAggregator eventAggregator)
{
_database = database;
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
}
protected QueryBuilder<TModel> Query
@@ -240,7 +241,7 @@ namespace NzbDrone.Core.Datastore
{
if (PublishModelEvents)
{
_messageAggregator.PublishEvent(new ModelEvent<TModel>(model, action));
_eventAggregator.PublishEvent(new ModelEvent<TModel>(model, action));
}
}
+2 -1
View File
@@ -6,6 +6,7 @@ using NzbDrone.Common.Composition;
using NzbDrone.Core.Datastore.Migration.Framework;
using NzbDrone.Core.Instrumentation;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Datastore
@@ -38,7 +39,7 @@ namespace NzbDrone.Core.Datastore
container.Register<ILogRepository>(c =>
{
var db = c.Resolve<IDbFactory>().Create(MigrationType.Log);
return new LogRepository(db, c.Resolve<IMessageAggregator>());
return new LogRepository(db, c.Resolve<IEventAggregator>());
});
}
+5 -4
View File
@@ -1,6 +1,7 @@
using NLog;
using NzbDrone.Core.Instrumentation;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Parser.Model;
namespace NzbDrone.Core.Download
@@ -13,15 +14,15 @@ namespace NzbDrone.Core.Download
public class DownloadService : IDownloadService
{
private readonly IProvideDownloadClient _downloadClientProvider;
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly Logger _logger;
public DownloadService(IProvideDownloadClient downloadClientProvider,
IMessageAggregator messageAggregator, Logger logger)
IEventAggregator eventAggregator, Logger logger)
{
_downloadClientProvider = downloadClientProvider;
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_logger = logger;
}
@@ -39,7 +40,7 @@ namespace NzbDrone.Core.Download
downloadClient.DownloadNzb(remoteEpisode);
_logger.ProgressInfo("Report sent to download client. {0}", downloadTitle);
_messageAggregator.PublishEvent(new EpisodeGrabbedEvent(remoteEpisode));
_eventAggregator.PublishEvent(new EpisodeGrabbedEvent(remoteEpisode));
}
}
}
+3 -2
View File
@@ -4,6 +4,7 @@ using System.Linq;
using Marr.Data.QGen;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Tv;
namespace NzbDrone.Core.History
@@ -16,8 +17,8 @@ namespace NzbDrone.Core.History
public class HistoryRepository : BasicRepository<History>, IHistoryRepository
{
public HistoryRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public HistoryRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
+1
View File
@@ -6,6 +6,7 @@ using NzbDrone.Core.Datastore;
using NzbDrone.Core.Download;
using NzbDrone.Core.MediaFiles.Events;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Tv;
namespace NzbDrone.Core.History
@@ -2,6 +2,7 @@
using NzbDrone.Core.Download;
using NzbDrone.Core.Instrumentation;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.IndexerSearch
{
@@ -2,6 +2,7 @@
using NzbDrone.Core.Download;
using NzbDrone.Core.Instrumentation;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.IndexerSearch
{
@@ -3,6 +3,7 @@ using NLog;
using NzbDrone.Core.Download;
using NzbDrone.Core.Instrumentation;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Tv;
namespace NzbDrone.Core.IndexerSearch
+3 -2
View File
@@ -2,6 +2,7 @@
using System.Linq;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Indexers
@@ -14,8 +15,8 @@ namespace NzbDrone.Core.Indexers
public class IndexerRepository : BasicRepository<IndexerDefinition>, IIndexerRepository
{
public IndexerRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public IndexerRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
+1
View File
@@ -7,6 +7,7 @@ using NzbDrone.Core.Configuration;
using NzbDrone.Core.Indexers.Newznab;
using NzbDrone.Core.Lifecycle;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using Omu.ValueInjecter;
namespace NzbDrone.Core.Indexers
+1
View File
@@ -4,6 +4,7 @@ using NzbDrone.Core.DecisionEngine;
using NzbDrone.Core.Download;
using NzbDrone.Core.Instrumentation;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.Indexers
{
@@ -5,6 +5,7 @@ using NLog.Layouts;
using NLog.Targets;
using NzbDrone.Core.Lifecycle;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Instrumentation
{
@@ -4,6 +4,7 @@ using NzbDrone.Common;
using NzbDrone.Common.EnvironmentInfo;
using NzbDrone.Core.Instrumentation.Commands;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.Instrumentation
{
@@ -1,6 +1,7 @@
using System;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Instrumentation
@@ -12,8 +13,8 @@ namespace NzbDrone.Core.Instrumentation
public class LogRepository : BasicRepository<Log>, ILogRepository
{
public LogRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public LogRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
@@ -1,6 +1,7 @@
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Instrumentation.Commands;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.Instrumentation
{
@@ -7,6 +7,7 @@ using NzbDrone.Core.Configuration;
using NzbDrone.Core.Configuration.Events;
using NzbDrone.Core.Lifecycle;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Instrumentation
{
+3 -2
View File
@@ -2,6 +2,7 @@ using System;
using System.Linq;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Jobs
@@ -16,8 +17,8 @@ namespace NzbDrone.Core.Jobs
public class ScheduledTaskRepository : BasicRepository<ScheduledTask>, IScheduledTaskRepository
{
public ScheduledTaskRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public ScheduledTaskRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
+4 -4
View File
@@ -14,15 +14,15 @@ namespace NzbDrone.Core.Jobs
IHandle<ApplicationShutdownRequested>
{
private readonly ITaskManager _taskManager;
private readonly IMessageAggregator _messageAggregator;
private readonly ICommandExecutor _commandExecutor;
private readonly Logger _logger;
private static readonly Timer Timer = new Timer();
private static CancellationTokenSource _cancellationTokenSource;
public Scheduler(ITaskManager taskManager, IMessageAggregator messageAggregator, Logger logger)
public Scheduler(ITaskManager taskManager, ICommandExecutor commandExecutor, Logger logger)
{
_taskManager = taskManager;
_messageAggregator = messageAggregator;
_commandExecutor = commandExecutor;
_logger = logger;
}
@@ -52,7 +52,7 @@ namespace NzbDrone.Core.Jobs
try
{
_messageAggregator.PublishCommand(task.TypeName);
_commandExecutor.PublishCommand(task.TypeName);
}
catch (Exception e)
{
@@ -6,6 +6,7 @@ using NLog;
using NzbDrone.Common;
using NzbDrone.Common.EnvironmentInfo;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Tv;
using NzbDrone.Core.Tv.Events;
+4 -4
View File
@@ -23,25 +23,25 @@ namespace NzbDrone.Core.MediaFiles
private readonly IDiskProvider _diskProvider;
private readonly IMakeImportDecision _importDecisionMaker;
private readonly IImportApprovedEpisodes _importApprovedEpisodes;
private readonly IMessageAggregator _messageAggregator;
private readonly ICommandExecutor _commandExecutor;
private readonly Logger _logger;
public DiskScanService(IDiskProvider diskProvider,
IMakeImportDecision importDecisionMaker,
IImportApprovedEpisodes importApprovedEpisodes,
IMessageAggregator messageAggregator, Logger logger)
ICommandExecutor commandExecutor, Logger logger)
{
_diskProvider = diskProvider;
_importDecisionMaker = importDecisionMaker;
_importApprovedEpisodes = importApprovedEpisodes;
_messageAggregator = messageAggregator;
_commandExecutor = commandExecutor;
_logger = logger;
}
private void Scan(Series series)
{
_logger.ProgressInfo("Scanning disk for {0}", series.Title);
_messageAggregator.PublishCommand(new CleanMediaFileDb(series.Id));
_commandExecutor.PublishCommand(new CleanMediaFileDb(series.Id));
if (!_diskProvider.FolderExists(series.Path))
{
@@ -9,6 +9,7 @@ using NzbDrone.Core.MediaFiles.Commands;
using NzbDrone.Core.MediaFiles.EpisodeImport;
using NzbDrone.Core.MediaFiles.EpisodeImport.Specifications;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Parser;
using NzbDrone.Core.Tv;
@@ -4,6 +4,7 @@ using System.Linq;
using NLog;
using NzbDrone.Common;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Organizer;
using NzbDrone.Core.Parser.Model;
using NzbDrone.Core.Tv;
@@ -20,19 +21,19 @@ namespace NzbDrone.Core.MediaFiles
{
private readonly IEpisodeService _episodeService;
private readonly IBuildFileNames _buildFileNames;
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly IDiskProvider _diskProvider;
private readonly Logger _logger;
public MoveEpisodeFiles(IEpisodeService episodeService,
IBuildFileNames buildFileNames,
IMessageAggregator messageAggregator,
IEventAggregator eventAggregator,
IDiskProvider diskProvider,
Logger logger)
{
_episodeService = episodeService;
_buildFileNames = buildFileNames;
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_diskProvider = diskProvider;
_logger = logger;
}
@@ -6,6 +6,7 @@ using NLog;
using NzbDrone.Common;
using NzbDrone.Core.MediaFiles.Events;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.MediaFiles.EpisodeImport
@@ -20,19 +21,19 @@ namespace NzbDrone.Core.MediaFiles.EpisodeImport
private readonly IUpgradeMediaFiles _episodeFileUpgrader;
private readonly IMediaFileService _mediaFileService;
private readonly IDiskProvider _diskProvider;
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly Logger _logger;
public ImportApprovedEpisodes(IUpgradeMediaFiles episodeFileUpgrader,
IMediaFileService mediaFileService,
IDiskProvider diskProvider,
IMessageAggregator messageAggregator,
IEventAggregator eventAggregator,
Logger logger)
{
_episodeFileUpgrader = episodeFileUpgrader;
_mediaFileService = mediaFileService;
_diskProvider = diskProvider;
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_logger = logger;
}
@@ -70,8 +71,8 @@ namespace NzbDrone.Core.MediaFiles.EpisodeImport
{
episodeFile.SceneName = Path.GetFileNameWithoutExtension(localEpisode.Path.CleanFilePath());
episodeFile.Path = _episodeFileUpgrader.UpgradeEpisodeFile(episodeFile, localEpisode);
_messageAggregator.PublishEvent(new EpisodeImportedEvent(localEpisode, episodeFile));
_messageAggregator.PublishEvent(new EpisodeDownloadedEvent(localEpisode));
_eventAggregator.PublishEvent(new EpisodeImportedEvent(localEpisode, episodeFile));
_eventAggregator.PublishEvent(new EpisodeDownloadedEvent(localEpisode));
}
_mediaFileService.Add(episodeFile);
@@ -2,6 +2,7 @@ using System.Collections.Generic;
using System.Linq;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.MediaFiles
@@ -17,8 +18,8 @@ namespace NzbDrone.Core.MediaFiles
public class MediaFileRepository : BasicRepository<EpisodeFile>, IMediaFileRepository
{
public MediaFileRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public MediaFileRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
+6 -5
View File
@@ -3,6 +3,7 @@ using System.Linq;
using NLog;
using NzbDrone.Core.MediaFiles.Events;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Tv.Events;
using NzbDrone.Common;
@@ -23,21 +24,21 @@ namespace NzbDrone.Core.MediaFiles
public class MediaFileService : IMediaFileService, IHandleAsync<SeriesDeletedEvent>
{
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly IMediaFileRepository _mediaFileRepository;
private readonly Logger _logger;
public MediaFileService(IMediaFileRepository mediaFileRepository, IMessageAggregator messageAggregator, Logger logger)
public MediaFileService(IMediaFileRepository mediaFileRepository, IEventAggregator eventAggregator, Logger logger)
{
_mediaFileRepository = mediaFileRepository;
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_logger = logger;
}
public EpisodeFile Add(EpisodeFile episodeFile)
{
var addedFile = _mediaFileRepository.Insert(episodeFile);
_messageAggregator.PublishEvent(new EpisodeFileAddedEvent(addedFile));
_eventAggregator.PublishEvent(new EpisodeFileAddedEvent(addedFile));
return addedFile;
}
@@ -50,7 +51,7 @@ namespace NzbDrone.Core.MediaFiles
{
_mediaFileRepository.Delete(episodeFile);
_messageAggregator.PublishEvent(new EpisodeFileDeletedEvent(episodeFile, forUpgrade));
_eventAggregator.PublishEvent(new EpisodeFileDeletedEvent(episodeFile, forUpgrade));
}
public bool Exists(string path)
@@ -4,6 +4,7 @@ using NLog;
using NzbDrone.Common;
using NzbDrone.Core.MediaFiles.Commands;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Tv;
namespace NzbDrone.Core.MediaFiles
@@ -6,6 +6,8 @@ using NzbDrone.Core.Instrumentation;
using NzbDrone.Core.MediaFiles.Commands;
using NzbDrone.Core.MediaFiles.Events;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Tv;
namespace NzbDrone.Core.MediaFiles
@@ -15,19 +17,19 @@ namespace NzbDrone.Core.MediaFiles
private readonly ISeriesService _seriesService;
private readonly IMediaFileService _mediaFileService;
private readonly IMoveEpisodeFiles _episodeFileMover;
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly Logger _logger;
public RenameEpisodeFileService(ISeriesService seriesService,
IMediaFileService mediaFileService,
IMoveEpisodeFiles episodeFileMover,
IMessageAggregator messageAggregator,
IEventAggregator eventAggregator,
Logger logger)
{
_seriesService = seriesService;
_mediaFileService = mediaFileService;
_episodeFileMover = episodeFileMover;
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_logger = logger;
}
@@ -59,7 +61,7 @@ namespace NzbDrone.Core.MediaFiles
if (renamed.Any())
{
_messageAggregator.PublishEvent(new SeriesRenamedEvent(series));
_eventAggregator.PublishEvent(new SeriesRenamedEvent(series));
}
}
+1 -1
View File
@@ -2,7 +2,7 @@ using System;
using FluentMigrator.Runner;
using NzbDrone.Common.Messaging;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging.Tracking;
using NzbDrone.Core.Messaging.Commands.Tracking;
namespace NzbDrone.Core.Messaging.Commands
{
@@ -4,80 +4,33 @@ using System.Threading.Tasks;
using NLog;
using NzbDrone.Common;
using NzbDrone.Common.EnsureThat;
using NzbDrone.Common.Messaging;
using NzbDrone.Common.Serializer;
using NzbDrone.Common.TPL;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Messaging.Commands.Tracking;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Messaging.Tracking;
using NzbDrone.Core.ProgressMessaging;
namespace NzbDrone.Core.Messaging
namespace NzbDrone.Core.Messaging.Commands
{
public class MessageAggregator : IMessageAggregator
public class CommandExecutor : ICommandExecutor
{
private readonly Logger _logger;
private readonly IServiceFactory _serviceFactory;
private readonly ITrackCommands _trackCommands;
private readonly IEventAggregator _eventAggregator;
private readonly TaskFactory _taskFactory;
public MessageAggregator(Logger logger, IServiceFactory serviceFactory, ITrackCommands trackCommands)
public CommandExecutor(Logger logger, IServiceFactory serviceFactory, ITrackCommands trackCommands, IEventAggregator eventAggregator)
{
var scheduler = new LimitedConcurrencyLevelTaskScheduler(3);
_logger = logger;
_serviceFactory = serviceFactory;
_trackCommands = trackCommands;
_eventAggregator = eventAggregator;
_taskFactory = new TaskFactory(scheduler);
}
public void PublishEvent<TEvent>(TEvent @event) where TEvent : class ,IEvent
{
Ensure.That(() => @event).IsNotNull();
var eventName = GetEventName(@event.GetType());
_logger.Trace("Publishing {0}", eventName);
//call synchronous handlers first.
foreach (var handler in _serviceFactory.BuildAll<IHandle<TEvent>>())
{
try
{
_logger.Trace("{0} -> {1}", eventName, handler.GetType().Name);
handler.Handle(@event);
_logger.Trace("{0} <- {1}", eventName, handler.GetType().Name);
}
catch (Exception e)
{
_logger.ErrorException(string.Format("{0} failed while processing [{1}]", handler.GetType().Name, eventName), e);
}
}
foreach (var handler in _serviceFactory.BuildAll<IHandleAsync<TEvent>>())
{
var handlerLocal = handler;
_taskFactory.StartNew(() =>
{
_logger.Trace("{0} ~> {1}", eventName, handlerLocal.GetType().Name);
handlerLocal.HandleAsync(@event);
_logger.Trace("{0} <~ {1}", eventName, handlerLocal.GetType().Name);
}, TaskCreationOptions.PreferFairness)
.LogExceptions();
}
}
private static string GetEventName(Type eventType)
{
if (!eventType.IsGenericType)
{
return eventType.Name;
}
return string.Format("{0}<{1}>", eventType.Name.Remove(eventType.Name.IndexOf('`')), eventType.GetGenericArguments()[0].Name);
}
public void PublishCommand<TCommand>(TCommand command) where TCommand : Command
{
Ensure.That(() => command).IsNotNull();
@@ -148,7 +101,7 @@ namespace NzbDrone.Core.Messaging
try
{
_trackCommands.Start(command);
PublishEvent(new CommandUpdatedEvent(command));
_eventAggregator.PublishEvent(new CommandUpdatedEvent(command));
if (!MappedDiagnosticsContext.Contains("CommandId") && command.SendUpdatesToClient)
{
@@ -157,13 +110,10 @@ namespace NzbDrone.Core.Messaging
handler.Execute((TCommand)command);
_trackCommands.Completed(command);
PublishEvent(new CommandUpdatedEvent(command));
}
catch (Exception e)
{
_trackCommands.Failed(command, e);
PublishEvent(new CommandUpdatedEvent(command));
throw;
}
finally
@@ -172,11 +122,10 @@ namespace NzbDrone.Core.Messaging
{
MappedDiagnosticsContext.Remove("CommandId");
}
_eventAggregator.PublishEvent(new CommandUpdatedEvent(command));
_eventAggregator.PublishEvent(new CommandExecutedEvent(command));
}
PublishEvent(new CommandExecutedEvent(command));
PublishEvent(new CommandUpdatedEvent(command));
_logger.Trace("{0} <- {1} [{2}]", command.GetType().Name, handler.GetType().Name, command.Runtime.ToString(""));
}
}
@@ -0,0 +1,10 @@
namespace NzbDrone.Core.Messaging.Commands
{
public interface ICommandExecutor
{
void PublishCommand<TCommand>(TCommand command) where TCommand : Command;
void PublishCommand(string commandTypeName);
Command PublishCommandAsync<TCommand>(TCommand command) where TCommand : Command;
Command PublishCommandAsync(string commandTypeName);
}
}
@@ -1,6 +1,4 @@
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.Messaging
namespace NzbDrone.Core.Messaging.Commands
{
public interface IExecute<TCommand> : IProcessMessage<TCommand> where TCommand : Command
{
@@ -1,4 +1,4 @@
namespace NzbDrone.Core.Messaging.Tracking
namespace NzbDrone.Core.Messaging.Commands.Tracking
{
public enum CommandStatus
{
@@ -2,9 +2,8 @@ using System;
using System.Collections.Generic;
using System.Linq;
using NzbDrone.Common.Cache;
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.Messaging.Tracking
namespace NzbDrone.Core.Messaging.Commands.Tracking
{
public interface ITrackCommands
{
@@ -1,7 +1,6 @@
using System;
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.Messaging.Tracking
namespace NzbDrone.Core.Messaging.Commands.Tracking
{
public class ExistingCommand
{
@@ -0,0 +1,7 @@
namespace NzbDrone.Core.Messaging.Commands.Tracking
{
public class TrackedCommandCleanupCommand : Command
{
}
}
@@ -0,0 +1,73 @@
using System;
using System.Threading.Tasks;
using NLog;
using NzbDrone.Common;
using NzbDrone.Common.EnsureThat;
using NzbDrone.Common.Messaging;
using NzbDrone.Common.TPL;
namespace NzbDrone.Core.Messaging.Events
{
public class EventAggregator : IEventAggregator
{
private readonly Logger _logger;
private readonly IServiceFactory _serviceFactory;
private readonly TaskFactory _taskFactory;
public EventAggregator(Logger logger, IServiceFactory serviceFactory)
{
var scheduler = new LimitedConcurrencyLevelTaskScheduler(3);
_logger = logger;
_serviceFactory = serviceFactory;
_taskFactory = new TaskFactory(scheduler);
}
public void PublishEvent<TEvent>(TEvent @event) where TEvent : class ,IEvent
{
Ensure.That(() => @event).IsNotNull();
var eventName = GetEventName(@event.GetType());
_logger.Trace("Publishing {0}", eventName);
//call synchronous handlers first.
foreach (var handler in _serviceFactory.BuildAll<IHandle<TEvent>>())
{
try
{
_logger.Trace("{0} -> {1}", eventName, handler.GetType().Name);
handler.Handle(@event);
_logger.Trace("{0} <- {1}", eventName, handler.GetType().Name);
}
catch (Exception e)
{
_logger.ErrorException(string.Format("{0} failed while processing [{1}]", handler.GetType().Name, eventName), e);
}
}
foreach (var handler in _serviceFactory.BuildAll<IHandleAsync<TEvent>>())
{
var handlerLocal = handler;
_taskFactory.StartNew(() =>
{
_logger.Trace("{0} ~> {1}", eventName, handlerLocal.GetType().Name);
handlerLocal.HandleAsync(@event);
_logger.Trace("{0} <~ {1}", eventName, handlerLocal.GetType().Name);
}, TaskCreationOptions.PreferFairness)
.LogExceptions();
}
}
private static string GetEventName(Type eventType)
{
if (!eventType.IsGenericType)
{
return eventType.Name;
}
return string.Format("{0}<{1}>", eventType.Name.Remove(eventType.Name.IndexOf('`')), eventType.GetGenericArguments()[0].Name);
}
}
}
@@ -0,0 +1,9 @@
using NzbDrone.Common.Messaging;
namespace NzbDrone.Core.Messaging.Events
{
public interface IEventAggregator
{
void PublishEvent<TEvent>(TEvent @event) where TEvent : class, IEvent;
}
}
@@ -1,6 +1,6 @@
using NzbDrone.Common.Messaging;
namespace NzbDrone.Core.Messaging
namespace NzbDrone.Core.Messaging.Events
{
public interface IHandle<TEvent> : IProcessMessage<TEvent> where TEvent : IEvent
{
@@ -1,17 +0,0 @@
using NzbDrone.Common.Messaging;
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.Messaging
{
/// <summary>
/// Enables loosely-coupled publication of events.
/// </summary>
public interface IMessageAggregator
{
void PublishEvent<TEvent>(TEvent @event) where TEvent : class, IEvent;
void PublishCommand<TCommand>(TCommand command) where TCommand : Command;
void PublishCommand(string commandTypeName);
Command PublishCommandAsync<TCommand>(TCommand command) where TCommand : Command;
Command PublishCommandAsync(string commandTypeName);
}
}
@@ -1,18 +0,0 @@
using System;
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.Messaging
{
public static class MessageExtensions
{
public static string GetExecutorName(this Type commandType)
{
if (!typeof(Command).IsAssignableFrom(commandType))
{
throw new ArgumentException("commandType must implement ICommand");
}
return string.Format("I{0}Executor", commandType.Name);
}
}
}
@@ -1,9 +0,0 @@
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.Messaging.Tracking
{
public class TrackedCommandCleanupCommand : Command
{
}
}
@@ -3,6 +3,7 @@ using System.Net;
using System.Net.Mail;
using NLog;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using Omu.ValueInjecter;
namespace NzbDrone.Core.Notifications.Email
@@ -5,6 +5,7 @@ using Growl.Connector;
using NLog;
using NzbDrone.Common.Instrumentation;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using GrowlNotification = Growl.Connector.Notification;
namespace NzbDrone.Core.Notifications.Growl
@@ -2,6 +2,7 @@
using System.Linq;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Notifications
@@ -14,8 +15,8 @@ namespace NzbDrone.Core.Notifications
public class NotificationRepository : BasicRepository<NotificationDefinition>, INotificationRepository
{
public NotificationRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public NotificationRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
@@ -7,6 +7,7 @@ using NzbDrone.Common.Serializer;
using NzbDrone.Core.Download;
using NzbDrone.Core.MediaFiles.Events;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Tv;
using Omu.ValueInjecter;
@@ -5,6 +5,7 @@ using System.Xml.Linq;
using NLog;
using NzbDrone.Common;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
namespace NzbDrone.Core.Notifications.Plex
{
@@ -1,6 +1,7 @@
using System;
using NLog;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using Prowlin;
namespace NzbDrone.Core.Notifications.Prowl
@@ -1,4 +1,5 @@
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using RestSharp;
using NzbDrone.Core.Rest;
@@ -7,6 +7,7 @@ using NLog;
using NzbDrone.Common;
using NzbDrone.Common.Instrumentation;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Tv;
using NzbDrone.Core.Model.Xbmc;
+3 -2
View File
@@ -233,6 +233,7 @@
<Compile Include="Instrumentation\DeleteLogFilesService.cs" />
<Compile Include="MediaFiles\MediaFileExtensions.cs" />
<Compile Include="MediaFiles\MediaInfo\VideoFileInfoReader.cs" />
<Compile Include="Messaging\CommandExecutor.cs" />
<Compile Include="MetadataSource\Trakt\TraktException.cs" />
<Compile Include="NzbDroneClientException.cs" />
<Compile Include="Instrumentation\LoggerExtensions.cs" />
@@ -246,9 +247,9 @@
<Compile Include="Messaging\Events\CommandExecutedEvent.cs" />
<Compile Include="Messaging\IExecute.cs" />
<Compile Include="Messaging\IHandle.cs" />
<Compile Include="Messaging\IMessageAggregator.cs" />
<Compile Include="Messaging\IEventAggregator.cs" />
<Compile Include="Messaging\IProcessMessage.cs" />
<Compile Include="Messaging\MessageAggregator.cs" />
<Compile Include="Messaging\EventAggregator.cs" />
<Compile Include="Messaging\MessageExtensions.cs" />
<Compile Include="Messaging\Tracking\CommandStatus.cs" />
<Compile Include="Messaging\Tracking\CommandTrackingService.cs" />
@@ -4,20 +4,21 @@ using NLog.Targets;
using NzbDrone.Core.Lifecycle;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Messaging.Tracking;
using NzbDrone.Core.Messaging.Commands.Tracking;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.ProgressMessaging
{
public class ProgressMessageTarget : Target, IHandle<ApplicationStartedEvent>
{
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly ITrackCommands _trackCommands;
private static LoggingRule _rule;
public ProgressMessageTarget(IMessageAggregator messageAggregator, ITrackCommands trackCommands)
public ProgressMessageTarget(IEventAggregator eventAggregator, ITrackCommands trackCommands)
{
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_trackCommands = trackCommands;
}
@@ -28,7 +29,7 @@ namespace NzbDrone.Core.ProgressMessaging
if (IsClientMessage(logEvent, command))
{
command.SetMessage(logEvent.FormattedMessage);
_messageAggregator.PublishEvent(new CommandUpdatedEvent(command));
_eventAggregator.PublishEvent(new CommandUpdatedEvent(command));
}
}
+2
View File
@@ -6,6 +6,8 @@ using NzbDrone.Common.Cache;
using NzbDrone.Common.Instrumentation;
using NzbDrone.Core.Lifecycle;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Tv;
using NzbDrone.Core.Tv.Events;
@@ -1,5 +1,6 @@
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Qualities
@@ -11,8 +12,8 @@ namespace NzbDrone.Core.Qualities
public class QualityProfileRepository : BasicRepository<QualityProfile>, IQualityProfileRepository
{
public QualityProfileRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public QualityProfileRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
}
@@ -3,6 +3,7 @@ using System.Linq;
using NLog;
using NzbDrone.Core.Lifecycle;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Tv;
@@ -2,6 +2,7 @@
using System.Linq;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Qualities
@@ -13,8 +14,8 @@ namespace NzbDrone.Core.Qualities
public class QualitySizeRepository : BasicRepository<QualitySize>, IQualitySizeRepository
{
public QualitySizeRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public QualitySizeRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
@@ -3,6 +3,7 @@ using System.Linq;
using NLog;
using NzbDrone.Core.Lifecycle;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Qualities
{
+2 -2
View File
@@ -31,8 +31,8 @@ namespace NzbDrone.Core.Tv
{
private readonly IDatabase _database;
public EpisodeRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public EpisodeRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
_database = database;
}
+1
View File
@@ -7,6 +7,7 @@ using NzbDrone.Core.Configuration;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.MediaFiles.Events;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Tv.Events;
namespace NzbDrone.Core.Tv
+7 -6
View File
@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using NLog;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Tv.Events;
namespace NzbDrone.Core.Tv
@@ -15,13 +16,13 @@ namespace NzbDrone.Core.Tv
public class RefreshEpisodeService : IRefreshEpisodeService
{
private readonly IEpisodeService _episodeService;
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly Logger _logger;
public RefreshEpisodeService(IEpisodeService episodeService, IMessageAggregator messageAggregator, Logger logger)
public RefreshEpisodeService(IEpisodeService episodeService, IEventAggregator eventAggregator, Logger logger)
{
_episodeService = episodeService;
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_logger = logger;
}
@@ -85,17 +86,17 @@ namespace NzbDrone.Core.Tv
if (newList.Any())
{
_messageAggregator.PublishEvent(new EpisodeInfoAddedEvent(newList, series));
_eventAggregator.PublishEvent(new EpisodeInfoAddedEvent(newList, series));
}
if (updateList.Any())
{
_messageAggregator.PublishEvent(new EpisodeInfoUpdatedEvent(updateList));
_eventAggregator.PublishEvent(new EpisodeInfoUpdatedEvent(updateList));
}
if (existingEpisodes.Any())
{
_messageAggregator.PublishEvent(new EpisodeInfoDeletedEvent(updateList));
_eventAggregator.PublishEvent(new EpisodeInfoDeletedEvent(updateList));
}
if (failCount != 0)
+4 -4
View File
@@ -18,16 +18,16 @@ namespace NzbDrone.Core.Tv
private readonly IProvideSeriesInfo _seriesInfo;
private readonly ISeriesService _seriesService;
private readonly IRefreshEpisodeService _refreshEpisodeService;
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly IDailySeriesService _dailySeriesService;
private readonly Logger _logger;
public RefreshSeriesService(IProvideSeriesInfo seriesInfo, ISeriesService seriesService, IRefreshEpisodeService refreshEpisodeService, IMessageAggregator messageAggregator, IDailySeriesService dailySeriesService, Logger logger)
public RefreshSeriesService(IProvideSeriesInfo seriesInfo, ISeriesService seriesService, IRefreshEpisodeService refreshEpisodeService, IEventAggregator eventAggregator, IDailySeriesService dailySeriesService, Logger logger)
{
_seriesInfo = seriesInfo;
_seriesService = seriesService;
_refreshEpisodeService = refreshEpisodeService;
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_dailySeriesService = dailySeriesService;
_logger = logger;
}
@@ -71,7 +71,7 @@ namespace NzbDrone.Core.Tv
_refreshEpisodeService.RefreshEpisodeInfo(series, tuple.Item2);
_logger.Debug("Finished series refresh for {0}", series.Title);
_messageAggregator.PublishEvent(new SeriesUpdatedEvent(series));
_eventAggregator.PublishEvent(new SeriesUpdatedEvent(series));
}
private List<Season> UpdateSeasons(Series series, Series seriesInfo)
+3 -2
View File
@@ -2,6 +2,7 @@ using System.Collections.Generic;
using System.Linq;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Tv
@@ -13,8 +14,8 @@ namespace NzbDrone.Core.Tv
public class SeasonRepository : BasicRepository<Series>, ISeasonRepository
{
public SeasonRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public SeasonRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
+3 -2
View File
@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
namespace NzbDrone.Core.Tv
@@ -21,8 +22,8 @@ namespace NzbDrone.Core.Tv
public class SeriesRepository : BasicRepository<Series>, ISeriesRepository
{
public SeriesRepository(IDatabase database, IMessageAggregator messageAggregator)
: base(database, messageAggregator)
public SeriesRepository(IDatabase database, IEventAggregator eventAggregator)
: base(database, eventAggregator)
{
}
+6 -5
View File
@@ -7,6 +7,7 @@ using NzbDrone.Common.EnsureThat;
using NzbDrone.Core.Configuration;
using NzbDrone.Core.DataAugmentation.Scene;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Events;
using NzbDrone.Core.Organizer;
using NzbDrone.Core.Tv.Events;
@@ -30,21 +31,21 @@ namespace NzbDrone.Core.Tv
{
private readonly ISeriesRepository _seriesRepository;
private readonly IConfigService _configService;
private readonly IMessageAggregator _messageAggregator;
private readonly IEventAggregator _eventAggregator;
private readonly ISceneMappingService _sceneMappingService;
private readonly IEpisodeService _episodeService;
private readonly Logger _logger;
public SeriesService(ISeriesRepository seriesRepository,
IConfigService configServiceService,
IMessageAggregator messageAggregator,
IEventAggregator eventAggregator,
ISceneMappingService sceneMappingService,
IEpisodeService episodeService,
Logger logger)
{
_seriesRepository = seriesRepository;
_configService = configServiceService;
_messageAggregator = messageAggregator;
_eventAggregator = eventAggregator;
_sceneMappingService = sceneMappingService;
_episodeService = episodeService;
_logger = logger;
@@ -73,7 +74,7 @@ namespace NzbDrone.Core.Tv
newSeries.SeasonFolder = _configService.UseSeasonFolder;
_seriesRepository.Insert(newSeries);
_messageAggregator.PublishEvent(new SeriesAddedEvent(newSeries));
_eventAggregator.PublishEvent(new SeriesAddedEvent(newSeries));
return newSeries;
}
@@ -109,7 +110,7 @@ namespace NzbDrone.Core.Tv
{
var series = _seriesRepository.Get(seriesId);
_seriesRepository.Delete(seriesId);
_messageAggregator.PublishEvent(new SeriesDeletedEvent(series, deleteFiles));
_eventAggregator.PublishEvent(new SeriesDeletedEvent(series, deleteFiles));
}
public List<Series> GetAllSeries()
@@ -4,6 +4,7 @@ using NLog;
using NzbDrone.Common;
using NzbDrone.Common.EnvironmentInfo;
using NzbDrone.Core.Messaging;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Update.Commands;
using NzbDrone.Core.Instrumentation;