// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.md in the project root for license information. using System; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Diagnostics; using System.Threading.Tasks; using Microsoft.AspNet.SignalR.Infrastructure; namespace Microsoft.AspNet.SignalR.Messaging { internal class ScaleoutStreamManager { private readonly Func, Task> _send; private readonly Action _receive; private readonly ScaleoutStream[] _streams; public ScaleoutStreamManager(Func, Task> send, Action receive, int streamCount, TraceSource trace, IPerformanceCounterManager performanceCounters, ScaleoutConfiguration configuration) { _streams = new ScaleoutStream[streamCount]; _send = send; _receive = receive; var receiveMapping = new ScaleoutMappingStore[streamCount]; performanceCounters.ScaleoutStreamCountTotal.RawValue = streamCount; performanceCounters.ScaleoutStreamCountBuffering.RawValue = streamCount; performanceCounters.ScaleoutStreamCountOpen.RawValue = 0; for (int i = 0; i < streamCount; i++) { _streams[i] = new ScaleoutStream(trace, "Stream(" + i + ")", configuration.MaxQueueLength, performanceCounters); receiveMapping[i] = new ScaleoutMappingStore(); } Streams = new ReadOnlyCollection(receiveMapping); } public IList Streams { get; private set; } public void Open(int streamIndex) { _streams[streamIndex].Open(); } public void Close(int streamIndex) { _streams[streamIndex].Close(); } public void OnError(int streamIndex, Exception exception) { _streams[streamIndex].SetError(exception); } public Task Send(int streamIndex, IList messages) { var context = new SendContext(this, streamIndex, messages); return _streams[streamIndex].Send(state => Send(state), context); } public void OnReceived(int streamIndex, ulong id, ScaleoutMessage message) { _receive(streamIndex, id, message); // We assume if a message has come in then the stream is open Open(streamIndex); } private static Task Send(object state) { var context = (SendContext)state; return context.StreamManager._send(context.Index, context.Messages); } private class SendContext { public ScaleoutStreamManager StreamManager; public int Index; public IList Messages; public SendContext(ScaleoutStreamManager scaleoutStream, int index, IList messages) { StreamManager = scaleoutStream; Index = index; Messages = messages; } } } }