signalr cleanup

This commit is contained in:
kay.one
2013-09-10 23:33:47 -07:00
committed by Keivan Beigi
parent feda4a9b67
commit 25e2c98c45
219 changed files with 2035 additions and 1495 deletions
@@ -4,7 +4,6 @@ using Exceptron.Client;
using Exceptron.Client.Configuration;
using NLog;
using NLog.Common;
using NLog.Config;
using NLog.Layouts;
using NLog.Targets;
using NzbDrone.Common.EnvironmentInfo;
@@ -23,9 +22,6 @@ namespace NzbDrone.Common.Instrumentation
/// </summary>
public IExceptronClient ExceptronClient { get; internal set; }
protected override void InitializeTarget()
{
var config = new ExceptronConfiguration
@@ -1,53 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using NLog;
using NzbDrone.Common.Messaging.Tracking;
namespace NzbDrone.Common.Instrumentation
{
public static class LoggerExtensions
{
public static void Progress(this Logger logger, string message)
{
LogProgressMessage(logger, message, ProcessState.Running);
}
public static void Progress(this Logger logger, string message, params object[] args)
{
var formattedMessage = String.Format(message, args);
Progress(logger, formattedMessage);
}
public static void Complete(this Logger logger, string message)
{
LogProgressMessage(logger, message, ProcessState.Completed);
}
public static void Complete(this Logger logger, string message, params object[] args)
{
var formattedMessage = String.Format(message, args);
Complete(logger, formattedMessage);
}
public static void Failed(this Logger logger, string message)
{
LogProgressMessage(logger, message, ProcessState.Failed);
}
public static void Failed(this Logger logger, string message, params object[] args)
{
var formattedMessage = String.Format(message, args);
Failed(logger, formattedMessage);
}
private static void LogProgressMessage(Logger logger, string message, ProcessState state)
{
var logEvent = new LogEventInfo(LogLevel.Info, logger.Name, message);
logEvent.Properties.Add("Status", state);
logger.Log(logEvent);
}
}
}
@@ -1,44 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using NzbDrone.Common.EnvironmentInfo;
namespace NzbDrone.Common.Messaging
{
public class CommandEqualityComparer : IEqualityComparer<ICommand>
{
public bool Equals(ICommand x, ICommand y)
{
var xProperties = x.GetType().GetProperties();
var yProperties = y.GetType().GetProperties();
foreach (var xProperty in xProperties)
{
if (xProperty.Name == "CommandId")
{
continue;
}
var yProperty = yProperties.SingleOrDefault(p => p.Name == xProperty.Name);
if (yProperty == null)
{
continue;
}
if (!xProperty.GetValue(x, null).Equals(yProperty.GetValue(y, null)))
{
return false;
}
}
return true;
}
public int GetHashCode(ICommand obj)
{
return obj.CommandId.GetHashCode();
}
}
}
@@ -1,14 +0,0 @@
using NzbDrone.Common.Messaging.Tracking;
namespace NzbDrone.Common.Messaging.Events
{
public class CommandCompletedEvent : IEvent
{
public TrackedCommand TrackedCommand { get; private set; }
public CommandCompletedEvent(TrackedCommand trackedCommand)
{
TrackedCommand = trackedCommand;
}
}
}
@@ -1,14 +0,0 @@
using NzbDrone.Common.Messaging.Tracking;
namespace NzbDrone.Common.Messaging.Events
{
public class CommandExecutedEvent : IEvent
{
public TrackedCommand TrackedCommand { get; private set; }
public CommandExecutedEvent(TrackedCommand trackedCommand)
{
TrackedCommand = trackedCommand;
}
}
}
@@ -1,17 +0,0 @@
using System;
using NzbDrone.Common.Messaging.Tracking;
namespace NzbDrone.Common.Messaging.Events
{
public class CommandFailedEvent : IEvent
{
public TrackedCommand TrackedCommand { get; private set; }
public Exception Exception { get; private set; }
public CommandFailedEvent(TrackedCommand trackedCommand, Exception exception)
{
TrackedCommand = trackedCommand;
Exception = exception;
}
}
}
@@ -1,14 +0,0 @@
using NzbDrone.Common.Messaging.Tracking;
namespace NzbDrone.Common.Messaging.Events
{
public class CommandStartedEvent : IEvent
{
public TrackedCommand TrackedCommand { get; private set; }
public CommandStartedEvent(TrackedCommand trackedCommand)
{
TrackedCommand = trackedCommand;
}
}
}
-10
View File
@@ -1,10 +0,0 @@
using System;
using System.Collections.Generic;
namespace NzbDrone.Common.Messaging
{
public interface ICommand : IMessage
{
String CommandId { get; }
}
}
+1 -1
View File
@@ -1,4 +1,4 @@
namespace NzbDrone.Common.Messaging
namespace NzbDrone.Common.Messaging
{
public interface IEvent : IMessage
{
-7
View File
@@ -1,7 +0,0 @@
namespace NzbDrone.Common.Messaging
{
public interface IExecute<TCommand> : IProcessMessage<TCommand> where TCommand : ICommand
{
void Execute(TCommand message);
}
}
-12
View File
@@ -1,12 +0,0 @@
namespace NzbDrone.Common.Messaging
{
public interface IHandle<TEvent> : IProcessMessage<TEvent> where TEvent : IEvent
{
void Handle(TEvent message);
}
public interface IHandleAsync<TEvent> : IProcessMessageAsync<TEvent> where TEvent : IEvent
{
void HandleAsync(TEvent message);
}
}
@@ -1,16 +0,0 @@
using NzbDrone.Common.Messaging.Tracking;
namespace NzbDrone.Common.Messaging
{
/// <summary>
/// Enables loosely-coupled publication of events.
/// </summary>
public interface IMessageAggregator
{
void PublishEvent<TEvent>(TEvent @event) where TEvent : class, IEvent;
void PublishCommand<TCommand>(TCommand command) where TCommand : class, ICommand;
void PublishCommand(string commandTypeName);
TrackedCommand PublishCommandAsync<TCommand>(TCommand command) where TCommand : class, ICommand;
TrackedCommand PublishCommandAsync(string commandTypeName);
}
}
@@ -1,10 +0,0 @@
namespace NzbDrone.Common.Messaging
{
public interface IProcessMessage { }
public interface IProcessMessageAsync : IProcessMessage { }
public interface IProcessMessage<TMessage> : IProcessMessage { }
public interface IProcessMessageAsync<TMessage> : IProcessMessageAsync { }
}
@@ -1,174 +0,0 @@
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using NLog;
using NzbDrone.Common.EnsureThat;
using NzbDrone.Common.Messaging.Events;
using NzbDrone.Common.Messaging.Tracking;
using NzbDrone.Common.Serializer;
using NzbDrone.Common.TPL;
namespace NzbDrone.Common.Messaging
{
public class MessageAggregator : IMessageAggregator
{
private readonly Logger _logger;
private readonly IServiceFactory _serviceFactory;
private readonly ITrackCommands _trackCommands;
private readonly TaskFactory _taskFactory;
public MessageAggregator(Logger logger, IServiceFactory serviceFactory, ITrackCommands trackCommands)
{
_logger = logger;
_serviceFactory = serviceFactory;
_trackCommands = trackCommands;
var scheduler = new LimitedConcurrencyLevelTaskScheduler(2);
_taskFactory = new TaskFactory(scheduler);
}
public void PublishEvent<TEvent>(TEvent @event) where TEvent : class ,IEvent
{
Ensure.That(() => @event).IsNotNull();
var eventName = GetEventName(@event.GetType());
_logger.Trace("Publishing {0}", eventName);
//call synchronous handlers first.
foreach (var handler in _serviceFactory.BuildAll<IHandle<TEvent>>())
{
try
{
_logger.Debug("{0} -> {1}", eventName, handler.GetType().Name);
handler.Handle(@event);
_logger.Debug("{0} <- {1}", eventName, handler.GetType().Name);
}
catch (Exception e)
{
_logger.ErrorException(string.Format("{0} failed while processing [{1}]", handler.GetType().Name, eventName), e);
}
}
foreach (var handler in _serviceFactory.BuildAll<IHandleAsync<TEvent>>())
{
var handlerLocal = handler;
_taskFactory.StartNew(() =>
{
_logger.Debug("{0} ~> {1}", eventName, handlerLocal.GetType().Name);
handlerLocal.HandleAsync(@event);
_logger.Debug("{0} <~ {1}", eventName, handlerLocal.GetType().Name);
}, TaskCreationOptions.PreferFairness)
.LogExceptions();
}
}
private static string GetEventName(Type eventType)
{
if (!eventType.IsGenericType)
{
return eventType.Name;
}
return string.Format("{0}<{1}>", eventType.Name.Remove(eventType.Name.IndexOf('`')), eventType.GetGenericArguments()[0].Name);
}
public void PublishCommand<TCommand>(TCommand command) where TCommand : class, ICommand
{
Ensure.That(() => command).IsNotNull();
_logger.Trace("Publishing {0}", command.GetType().Name);
var trackedCommand = _trackCommands.TrackIfNew(command);
if (trackedCommand == null)
{
_logger.Info("Command is already in progress: {0}", command.GetType().Name);
return;
}
ExecuteCommand<TCommand>(trackedCommand);
}
public void PublishCommand(string commandTypeName)
{
dynamic command = GetCommand(commandTypeName);
PublishCommand(command);
}
public TrackedCommand PublishCommandAsync<TCommand>(TCommand command) where TCommand : class, ICommand
{
Ensure.That(() => command).IsNotNull();
_logger.Trace("Publishing {0}", command.GetType().Name);
var existingCommand = _trackCommands.TrackNewOrGet(command);
if (existingCommand.Existing)
{
_logger.Info("Command is already in progress: {0}", command.GetType().Name);
return existingCommand.TrackedCommand;
}
_taskFactory.StartNew(() => ExecuteCommand<TCommand>(existingCommand.TrackedCommand)
, TaskCreationOptions.PreferFairness)
.LogExceptions();
return existingCommand.TrackedCommand;
}
public TrackedCommand PublishCommandAsync(string commandTypeName)
{
dynamic command = GetCommand(commandTypeName);
return PublishCommandAsync(command);
}
private dynamic GetCommand(string commandTypeName)
{
var commandType = _serviceFactory.GetImplementations(typeof(ICommand))
.Single(c => c.FullName.Equals(commandTypeName, StringComparison.InvariantCultureIgnoreCase));
return Json.Deserialize("{}", commandType);
}
private void ExecuteCommand<TCommand>(TrackedCommand trackedCommand) where TCommand : class, ICommand
{
var command = (TCommand)trackedCommand.Command;
var handlerContract = typeof(IExecute<>).MakeGenericType(command.GetType());
var handler = (IExecute<TCommand>)_serviceFactory.Build(handlerContract);
_logger.Debug("{0} -> {1}", command.GetType().Name, handler.GetType().Name);
var sw = Stopwatch.StartNew();
try
{
if (!MappedDiagnosticsContext.Contains("CommandId"))
{
MappedDiagnosticsContext.Set("CommandId", trackedCommand.Command.CommandId);
}
PublishEvent(new CommandStartedEvent(trackedCommand));
handler.Execute(command);
sw.Stop();
_trackCommands.Completed(trackedCommand, sw.Elapsed);
PublishEvent(new CommandCompletedEvent(trackedCommand));
}
catch (Exception e)
{
_trackCommands.Failed(trackedCommand, e);
PublishEvent(new CommandFailedEvent(trackedCommand, e));
throw;
}
finally
{
PublishEvent(new CommandExecutedEvent(trackedCommand));
}
_logger.Debug("{0} <- {1} [{2}]", command.GetType().Name, handler.GetType().Name, sw.Elapsed.ToString(""));
}
}
}
@@ -1,17 +0,0 @@
using System;
namespace NzbDrone.Common.Messaging
{
public static class MessageExtensions
{
public static string GetExecutorName(this Type commandType)
{
if (!typeof(ICommand).IsAssignableFrom(commandType))
{
throw new ArgumentException("commandType must implement ICommand");
}
return string.Format("I{0}Executor", commandType.Name);
}
}
}
-15
View File
@@ -1,15 +0,0 @@
using System;
namespace NzbDrone.Common.Messaging
{
public class TestCommand : ICommand
{
public int Duration { get; set; }
public String CommandId { get; private set; }
public TestCommand()
{
Duration = 4000;
}
}
}
@@ -1,12 +0,0 @@
using System.Threading;
namespace NzbDrone.Common.Messaging
{
public class TestCommandExecutor : IExecute<TestCommand>
{
public void Execute(TestCommand message)
{
Thread.Sleep(message.Duration);
}
}
}
@@ -1,127 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Remoting;
using NzbDrone.Common.Cache;
namespace NzbDrone.Common.Messaging.Tracking
{
public interface ITrackCommands
{
TrackedCommand TrackIfNew(ICommand command);
ExistingCommand TrackNewOrGet(ICommand command);
TrackedCommand Completed(TrackedCommand trackedCommand, TimeSpan runtime);
TrackedCommand Failed(TrackedCommand trackedCommand, Exception e);
List<TrackedCommand> AllTracked();
Boolean ExistingCommand(ICommand command);
TrackedCommand FindExisting(ICommand command);
}
public class TrackCommands : ITrackCommands, IExecute<TrackedCommandCleanupCommand>
{
private readonly ICached<TrackedCommand> _cache;
public TrackCommands(ICacheManger cacheManger)
{
_cache = cacheManger.GetCache<TrackedCommand>(GetType());
}
public TrackedCommand TrackIfNew(ICommand command)
{
if (ExistingCommand(command))
{
return null;
}
var trackedCommand = new TrackedCommand(command, ProcessState.Running);
Store(trackedCommand);
return trackedCommand;
}
public ExistingCommand TrackNewOrGet(ICommand command)
{
var trackedCommand = FindExisting(command);
if (trackedCommand == null)
{
trackedCommand = new TrackedCommand(command, ProcessState.Running);
Store(trackedCommand);
return new ExistingCommand(false, trackedCommand);
}
return new ExistingCommand(true, trackedCommand);
}
public TrackedCommand Completed(TrackedCommand trackedCommand, TimeSpan runtime)
{
trackedCommand.StateChangeTime = DateTime.UtcNow;
trackedCommand.State = ProcessState.Completed;
trackedCommand.Runtime = runtime;
Store(trackedCommand);
return trackedCommand;
}
public TrackedCommand Failed(TrackedCommand trackedCommand, Exception e)
{
trackedCommand.StateChangeTime = DateTime.UtcNow;
trackedCommand.State = ProcessState.Failed;
trackedCommand.Exception = e;
Store(trackedCommand);
return trackedCommand;
}
public List<TrackedCommand> AllTracked()
{
return _cache.Values.ToList();
}
public bool ExistingCommand(ICommand command)
{
return FindExisting(command) != null;
}
public TrackedCommand FindExisting(ICommand command)
{
var comparer = new CommandEqualityComparer();
return Running(command.GetType()).SingleOrDefault(t => comparer.Equals(t.Command, command));
}
private List<TrackedCommand> Running(Type type = null)
{
var running = AllTracked().Where(i => i.State == ProcessState.Running);
if (type != null)
{
return running.Where(t => t.Type == type.FullName).ToList();
}
return running.ToList();
}
private void Store(TrackedCommand trackedCommand)
{
if (trackedCommand.Command.GetType() == typeof(TrackedCommandCleanupCommand))
{
return;
}
_cache.Set(trackedCommand.Command.CommandId, trackedCommand);
}
public void Execute(TrackedCommandCleanupCommand message)
{
var old = AllTracked().Where(c => c.State != ProcessState.Running && c.StateChangeTime < DateTime.UtcNow.AddMinutes(-5));
foreach (var trackedCommand in old)
{
_cache.Remove(trackedCommand.Command.CommandId);
}
}
}
}
@@ -1,19 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace NzbDrone.Common.Messaging.Tracking
{
public class ExistingCommand
{
public Boolean Existing { get; set; }
public TrackedCommand TrackedCommand { get; set; }
public ExistingCommand(Boolean exisitng, TrackedCommand trackedCommand)
{
Existing = exisitng;
TrackedCommand = trackedCommand;
}
}
}
@@ -1,14 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace NzbDrone.Common.Messaging.Tracking
{
public enum ProcessState
{
Running,
Completed,
Failed
}
}
@@ -1,30 +0,0 @@
using System;
namespace NzbDrone.Common.Messaging.Tracking
{
public class TrackedCommand
{
public String Id { get; private set; }
public String Name { get; private set; }
public String Type { get; private set; }
public ICommand Command { get; private set; }
public ProcessState State { get; set; }
public DateTime StateChangeTime { get; set; }
public TimeSpan Runtime { get; set; }
public Exception Exception { get; set; }
public TrackedCommand()
{
}
public TrackedCommand(ICommand command, ProcessState state)
{
Id = command.CommandId;
Name = command.GetType().Name;
Type = command.GetType().FullName;
Command = command;
State = state;
StateChangeTime = DateTime.UtcNow;
}
}
}
@@ -1,17 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace NzbDrone.Common.Messaging.Tracking
{
public class TrackedCommandCleanupCommand : ICommand
{
public string CommandId { get; private set; }
public TrackedCommandCleanupCommand()
{
CommandId = HashUtil.GenerateCommandId();
}
}
}
+3 -23
View File
@@ -92,16 +92,11 @@
<Compile Include="IEnumerableExtensions.cs" />
<Compile Include="Instrumentation\GlobalExceptionHandlers.cs" />
<Compile Include="Instrumentation\ExceptronTarget.cs" />
<Compile Include="Instrumentation\LogEventExtensions.cs" />
<Compile Include="Instrumentation\NzbDroneLogger.cs" />
<Compile Include="Instrumentation\LogTargets.cs" />
<Compile Include="Instrumentation\LoggerExtensions.cs" />
<Compile Include="Messaging\Tracking\ProcessState.cs" />
<Compile Include="Messaging\Tracking\CommandTrackingService.cs" />
<Compile Include="Messaging\Tracking\ExistingCommand.cs" />
<Compile Include="Messaging\Tracking\TrackedCommand.cs" />
<Compile Include="Messaging\Events\CommandStartedEvent.cs" />
<Compile Include="Messaging\CommandEqualityComparer.cs" />
<Compile Include="Messaging\Tracking\TrackedCommandCleanupCommand.cs" />
<Compile Include="Messaging\IEvent.cs" />
<Compile Include="Messaging\IMessage.cs" />
<Compile Include="PathEqualityComparer.cs" />
<Compile Include="Services.cs" />
<Compile Include="TPL\LimitedConcurrencyLevelTaskScheduler.cs" />
@@ -109,20 +104,8 @@
<Compile Include="StringExtensions.cs" />
<Compile Include="EnsureThat\TypeParam.cs" />
<Compile Include="HashUtil.cs" />
<Compile Include="Instrumentation\LogEventExtensions.cs" />
<Compile Include="Instrumentation\LogglyTarget.cs" />
<Compile Include="Serializer\Json.cs" />
<Compile Include="Messaging\Events\CommandCompletedEvent.cs" />
<Compile Include="Messaging\Events\CommandExecutedEvent.cs" />
<Compile Include="Messaging\Events\CommandFailedEvent.cs" />
<Compile Include="Messaging\IExecute.cs" />
<Compile Include="Messaging\ICommand.cs" />
<Compile Include="Messaging\IMessage.cs" />
<Compile Include="Messaging\IProcessMessage.cs" />
<Compile Include="Messaging\MessageAggregator.cs" />
<Compile Include="Messaging\IEvent.cs" />
<Compile Include="Messaging\IMessageAggregator.cs" />
<Compile Include="Messaging\IHandle.cs" />
<Compile Include="Expansive\CircularReferenceException.cs" />
<Compile Include="Expansive\Expansive.cs" />
<Compile Include="Expansive\PatternStyle.cs" />
@@ -130,9 +113,6 @@
<Compile Include="Expansive\TreeNode.cs" />
<Compile Include="Expansive\TreeNodeList.cs" />
<Compile Include="Instrumentation\VersionLayoutRenderer.cs" />
<Compile Include="Messaging\MessageExtensions.cs" />
<Compile Include="Messaging\TestCommand.cs" />
<Compile Include="Messaging\TestCommandExecutor.cs" />
<Compile Include="Reflection\ReflectionExtensions.cs" />
<Compile Include="ServiceFactory.cs" />
<Compile Include="HttpProvider.cs" />