//PlazaConnectionPool.cs //Copyright (c) 2013 StockSharp LLC, all rights reserved. //This code module is part of StockSharp library. //This code is licensed under the GNU GENERAL PUBLIC LICENSE Version 3. //See the file License.txt for the license details. //More info on: http://stocksharp.com namespace StockSharp.Plaza { using System; using System.Linq; using System.Net; using System.Runtime.InteropServices; using Ecng.Collections; using Ecng.Common; using StockSharp.Logging; using StockSharp.Plaza.Wrappers; class PlazaConnectionPool : Disposable { private readonly TimeSpan _errorSleepInterval = TimeSpan.FromMilliseconds(2000); private readonly SynchronizedList _connections = new SynchronizedList(); private bool _connectDone; private readonly Action _connected; private readonly Action _disconnected; private readonly Action _connectionError; private readonly Action _processError; private readonly Action _log; public PlazaConnectionPool(Action connected, Action disconnected, Action connectionError, Action processError, Action log) { if (connected == null) throw new ArgumentNullException("connected"); if (disconnected == null) throw new ArgumentNullException("disconnected"); if (connectionError == null) throw new ArgumentNullException("connectionError"); if (processError == null) throw new ArgumentNullException("processError"); if (log == null) throw new ArgumentNullException("log"); _connected = connected; _disconnected = disconnected; _connectionError = connectionError; _processError = processError; _log = log; AppName = "SS"; //TODO: проверить PollTimeOut = 1 сек PollTimeOut = TimeSpan.FromMilliseconds(100); UseLocalProtocol = false; } //Необходимо ли выполнять ProcessMessage() после отключения. //Если ProcessMessage() выполняется при отсуствии подключения - сильно тормозится система. private bool _processMessageNeeded; private string _login; public string Login { get { return _login; } set { _login = value; ChangeConnection(connection => connection.LoginStr = LoginStr); } } private string _password; public string Password { get { return _password; } set { _password = value; // если соединение устанавливается локально (пароль прописан в разделе AS:Local) if (Login.IsEmpty()) ChangeConnection(connection => connection.Password = value ?? string.Empty); // если соединение передает логин и пароль к счету через себя else ChangeConnection(connection => connection.LoginStr = LoginStr); } } private EndPoint _address; public EndPoint Address { get { return _address; } set { if (value == null) throw new ArgumentNullException("value"); _address = value; ChangeConnection(ApplyAddress); } } private string _appName; public string AppName { get { return _appName; } set { if (value == null) throw new ArgumentNullException("value"); // P2ClientGate.doc - стр. 8: // Из одного процесса можно создавать любое количество локальных соединений к одному роутеру или нескольким роутерам. // При этом надо соблюдать следующие правила: Каждое соединение должно иметь уникальный AppName в пределах роутера. // Appname используется в сети для идентификации конечного отправителя-получателя сообщения. // Не рекомендуется придумывать слишком длинные AppName – при работе online размеры пакетов данных невелики, // и служебная информация, частью которой является AppName может превысить «полезную нагрузку» в пакете. var index = 0; ChangeConnection(connection => connection.AppName = value + "_" + index++); _appName = value; } } public bool UseLocalProtocol { get; set; } public bool IsConnected { get; private set; } private uint _pollTimeOut; public TimeSpan PollTimeOut { get { return TimeSpan.FromMilliseconds(_pollTimeOut); } set { if (value < TimeSpan.Zero) throw new ArgumentOutOfRangeException("value", value, @"Неправильное значение."); _pollTimeOut = (uint)value.TotalMilliseconds; } } private string _srvAddress; public string SrvAddress { get { return _srvAddress ?? (_srvAddress = FirstConnection.ResolveService("FORTS_SRV")); } } private IConnectionWrapper FirstConnection { get { return _connections.First(); } } private string LoginStr { get { return "USERNAME={0};PASSWORD={1}".Put(Login, Password); } } private bool RequireAuthentication { get { // авторизацию нужно делать только в случае передачи и логина и пароля return (!Login.IsEmpty() && !Password.IsEmpty()); } } private bool IsLocal { get { if (Address is IPEndPoint) return IPAddress.IsLoopback(((IPEndPoint)Address).Address); else if (Address is DnsEndPoint) return ((DnsEndPoint)Address).Host.CompareIgnoreCase("localhost"); else throw new InvalidOperationException("Неизвестная информация об адресе."); } } public PlazaConnection RunConnection(string name, Action initialize, Action> pollAction, Action finalize) { if (initialize == null) throw new ArgumentNullException("initialize"); if (pollAction == null) throw new ArgumentNullException("pollAction"); if (finalize == null) throw new ArgumentNullException("finalize"); var connection = new PlazaConnection(); connection.Thread = ThreadingHelper.Thread(() => SafeDo(() => { var isFirstConnection = false; connection.NativeConnection = _connections.SyncGet(connections => { var с = WrapperHelper.CreateConnection(); ApplyAddress(с); с.AppName = AppName + "_" + _connections.Count; if (!Password.IsEmpty()) { if (Login.IsEmpty()) с.Password = Password; else с.LoginStr = LoginStr; } isFirstConnection = _connections.IsEmpty(); // Подписываемся только один раз, иначе в PlazaTrader события будут повторяться по кол-ву подключений if (isFirstConnection) с.ConnectionStatusChanged += OnConnectionStatusChanged; с.ConnectionStatusChanged += TraceConnectionStatus; _connections.Add(с); return с; }); try { // если Connect был вызван до создания соединения if (_connectDone) Connect(connection.NativeConnection, isFirstConnection); initialize(connection); while (connection.Runned) { //Выполняем получение сообщений либо пока подключены, либо - если установлен флаг (при подключении) if (_processMessageNeeded || IsConnected) { var error = SafeDo(() => { pollAction(connection, _processError); //ProcessMessage может также кидать exception connection.NativeConnection.ProcessMessage(PollTimeOut); }); if (error != null || connection.NativeConnection.Status != (ConnectionWrapperStatus.Connected | ConnectionWrapperStatus.RouterConnected)) { _errorSleepInterval.Sleep(); } } else { PollTimeOut.Sleep(); } } finalize(connection); } finally { Close(connection.NativeConnection); if (isFirstConnection) connection.NativeConnection.ConnectionStatusChanged -= OnConnectionStatusChanged; connection.NativeConnection.ConnectionStatusChanged -= TraceConnectionStatus; } }, connection.RaiseStopped)) .Name(name); return connection; } private void TraceConnectionStatus(IConnectionWrapper connection, ConnectionWrapperStatus status) { _log(LogLevels.Info, "OnConnectionStatusChanged: conn {0} - status: {1}".Put(connection.AppName, status)); } // При обрыве связи нужно: // - закрыть все потоки (+) // - вызвать метод Disconnect (+) // - вызывать метод Connect до достижения успеха // - открыть все потоки. // Перед последним пунктом можно сделать InitFromIni и задать последний полученный ревижен, // чтобы быть уверенным, что схема и номер жизни за время восстановления не изменились, и начать получать данные с последнего запомненного ревижена (+1) private void OnConnectionStatusChanged(IConnectionWrapper connection, ConnectionWrapperStatus status) { if (status.Contains(ConnectionWrapperStatus.RouterLoginFailed) || status.Contains(ConnectionWrapperStatus.RouterNoConnect)) { if (IsConnected) { IsConnected = false; _connectionError(new InvalidOperationException("Роутер не смог установить соединение. Состояние соединения {0}.".Put(status))); } if (RequireAuthentication) { connection.Logout(); connection.Login(); } return; } else if (status.Contains(ConnectionWrapperStatus.RouterDisconnected) || status.Contains(ConnectionWrapperStatus.RouterReconnecting)) { if (IsConnected) { IsConnected = false; _connectionError(new InvalidOperationException("Роутер разорвал соединение. Состояние соединения {0}.".Put(status))); } return; } else if (status.Contains(ConnectionWrapperStatus.Disconnected)) { if (IsConnected) { IsConnected = false; _disconnected(); } return; } // (константа CS_CONNECTION_INVALID) — нарушен протокол работы соединения, дальнейшая работа возможна только // после повторной установки соединения. Допустима одновременная установка битов CS_CONNECTION_CONNECTED и // CS_CONNECTION_INVALID. else if (status.Contains(ConnectionWrapperStatus.Invalid)) { if (IsConnected) { IsConnected = false; _connectionError(new InvalidOperationException("Соединение с роутером потеряно. Состояние соединения {0}.".Put(status))); } return; } else if (status.Contains(ConnectionWrapperStatus.Connected) && status.Contains(ConnectionWrapperStatus.RouterConnected)) { if (!IsConnected) { IsConnected = true; _connected(); } return; } else if (status.Contains(ConnectionWrapperStatus.Busy)) { return; } throw new ArgumentOutOfRangeException("status", status, "Неизвестное состояние соединения {0}.".Put(status)); } public void Connect() { _connectDone = true; var isFirstConnection = true; ChangeConnection(c => { Connect(c, isFirstConnection); isFirstConnection = false; }); } private void Connect(IConnectionWrapper connection, bool isFirstConnection) { if (connection == null) throw new ArgumentNullException("connection"); try { _processMessageNeeded = true; // P2ClientGate.doc - стр. 7: // Для локальных соединений возможно подключение: // c помощью протокола LRPCQ. LRPCQ является простым транспортом, основанным на использовании shared-memory в ОС Windows. // Использование LRPCQ возможно только при запуске приложения-клиента и роутера на одном компьютере. // Протокол LRPCQ имеет меньшие накладные расходы, чем TCP, // передача сообщений между приложением и роутером с использованием LRPCQ будет происходить быстрее. var appName = connection.AppName; if (UseLocalProtocol && IsLocal) connection.Connect("AppName={0};LRPCQ_PORT={1}".Put(appName, connection.Port)); else connection.Connect(); if (RequireAuthentication) connection.Login(); _log(LogLevels.Info, "Attempted to connect {0}.".Put(appName)); } catch (COMException ex) { Disconnect(connection); var error = new PlazaException("Ошибка подключения к серверу Плазы.", ex); if (isFirstConnection) _connectionError(error); else throw error; } } public void Disconnect() { _connectDone = false; ChangeConnection(Disconnect); _disconnected(); IsConnected = false; } private void Disconnect(IConnectionWrapper connection) { if (connection == null) throw new ArgumentNullException("connection"); SafeDo(() => { _processMessageNeeded = false; if (RequireAuthentication) connection.Logout(); connection.Disconnect(); }); } private void Close(IConnectionWrapper connection) { Disconnect(connection); connection.ConnectionStatusChanged -= TraceConnectionStatus; _connections.Remove(connection); } private Exception SafeDo(Action action, Action final = null) { if (action == null) throw new ArgumentNullException("action"); Exception error = null; try { action(); } catch (COMException ex) { error = new PlazaException("Ошибка Плазы.", ex); _processError(error); } catch (Exception ex) { error = ex; _processError(ex); } finally { final.SafeInvoke(); } return error; } private void ChangeConnection(Action handler) { if (handler == null) throw new ArgumentNullException("handler"); _connections.SyncDo(c => c.ForEach(handler)); } private void ApplyAddress(IConnectionWrapper connection) { if (Address is IPEndPoint) { var ip = (IPEndPoint)Address; connection.Host = ip.Address.ToString(); connection.Port = (uint)ip.Port; } else if (Address is DnsEndPoint) { var dns = (DnsEndPoint)Address; connection.Host = dns.Host; connection.Port = (uint)dns.Port; } else throw new InvalidOperationException("Неизвестная информация об адресе."); } protected override void DisposeNative() { Disconnect(); _connections.Clear(); } } }