Рефакторинг Plaza коннектора
Atom Ответить
16.09.2011


У нас там ещё и Reflection используется...

Давайте постепенно будем менять структуру.
Какие цели - избавиться от текущей перегруженности, добавить возможность поддержать несколько подключений.

Какие у кого предложения по структуре?

Теги:


Спасибо:



Скидка 15% на все обучение до 5 апреля (осталось 2 дней).

36 Ответов
< 1 2 
Alexander

Фотография
Дата: 27.09.2011
Ответить


frontman Перейти
Я думаю даже 100мс скоро много будет)

С чего мало? Эти 100мс передаются в ProcessMessage. Для чего они там нужны - см. документацию плазы. Хотите - измените на поменьше, это делайте через свойство StreamTimeOut.

Цитата:

У меня вопрос : а не может ли тот факт что транзакции и получение данных идет в одном потоке мешать выполнению этих самых транзакций.
Т.е я говорю не о задержки как писал Александр а о полной блокировки. Например при получении данных о стакане? Т.к. стакан обновляется постоянно, т.е поток входной инфы практически бесконечен?


Так и происходит. Только у нас 2 коннекшена - 1 для данных, 1 - для транзакций. Я по коду проверил, всё должно быть в порядке.
Вы протестировали с изменениями? Как результаты?
Автор топика
Спасибо:

frontman

Фотография
Дата: 27.09.2011
Ответить


Ну могу сказать точно что заявки стали быстрее выставляться... Но моя проблема с получением ответа от биржи о выставлении заявки сохранилась(
Спасибо:

frontman

Фотография
Дата: 27.09.2011
Ответить


Александр а вы пробовали одновременно создавать вот такие правила?

Код
this.When(Security.MarketDepthChanged())
                .Do<MarketDepth>(MarketDepthChanged);

this.When(this.StrategyNewOrder())
    .Do<Order>(order => this.AddOrderInfoLog(order, "Выставленна"));


Спасибо:

FiNick

Фотография
Благотворитель
Дата: 06.10.2011
Ответить


Повторно подниму эту тему.
На сколько я понимаю рефакторинг позволил создавать множество коннекшнов до гейта и соответственно обрабатывать потоки репликации в нескольких тредах. Похоже это не роскошь, а необходимость.
Я отсылал в РТС логи, они сделали такие замечания:

По приложенному логу видно, что накапливается очередь сообщений в первые секунды работы.
2011-10-06 16:45:47.234;p2mq-cli;;New message added to recvList. Size: 38
Это приводит к задержкам в получении данных, поэтому предлагается побороться с очередями. Для этого предлагается разбить получение реплики на несколько соединений, работающих в отдельных thread'ах, каждое со своим циклом выборки.


P.S. Начал прикидывать как это можно сделать, пока не получается, все ломается
Спасибо:

Alexander

Фотография
Дата: 06.10.2011
Ответить


FiNick Перейти
Повторно подниму эту тему.
На сколько я понимаю рефакторинг позволил создавать множество коннекшнов до гейта и соответственно обрабатывать потоки репликации в нескольких тредах. Похоже это не роскошь, а необходимость.
Я отсылал в РТС логи, они сделали такие замечания:

По приложенному логу видно, что накапливается очередь сообщений в первые секунды работы.
2011-10-06 16:45:47.234;p2mq-cli;;New message added to recvList. Size: 38
Это приводит к задержкам в получении данных, поэтому предлагается побороться с очередями. Для этого предлагается разбить получение реплики на несколько соединений, работающих в отдельных thread'ах, каждое со своим циклом выборки.


P.S. Начал прикидывать как это можно сделать, пока не получается, все ломается


несколько соединений - connection pool
а рефакторинг позволил создавать несколько PlazaTrader
Автор топика
Спасибо:

frontman

Фотография
Дата: 16.12.2011
Ответить


Продолжаю свое капание в коде плазы)
У меня вызвал некое смущение вот этот метод:
Код
internal void ProcessConnection(IConnectionWrapper connection)
		{
			switch (State)
			{
				case PlazaStreamStates.Closed:
					if (ConfigFileName.IsEmpty())
						throw new InvalidOperationException("ConfigFileName is null or empty");

					NativeStream.TableSet = WrapperHelper.CreateTableSet();
					NativeStream.TableSet.Init(ConfigFileName, Table.ReplicationScheme);

					SetRevision();

					State = PlazaStreamStates.Opened;

					Log.SafeInvoke(this, "Поток открыт.");
					break;
				case PlazaStreamStates.Opened:
					if (NativeStream.State == StreamWrapperStates.Error || NativeStream.State == StreamWrapperStates.Closed)
					{
						Log.SafeInvoke(this, "Состояние потока {0}.".Put(NativeStream.State));

						if (NativeStream.State == StreamWrapperStates.Error)
							NativeStream.Close();

						SetRevision();

						NativeStream.Open(connection);
						ReConnected.SafeInvoke();

						Log.SafeInvoke(this, "Поток переоткрыт.");
					}
					break;
				default:
					throw new ArgumentOutOfRangeException();
			}
		}


Во первых я считаю что его надо разбить на несколько независимых методов.
А во вторых с точки зрения логики для меня он непонятен.
В первой ветке case PlazaStreamStates.Closed: в лог пишется что поток открыт и статус у потока меняется на Opened, но на самом деле это не так. Поток еще не открыт. Происходит только загрузка схемы для потока.
Такая же ситуации ветке case PlazaStreamStates.Opened: В лог пишется "поток переоткрыт" , хотя на самом деле это только его первое открытие(за исключением его реальных переоткрытиях в случае необходимости).
Спасибо:

Alexander

Фотография
Дата: 16.12.2011
Ответить


frontman Перейти
Продолжаю свое капание в коде плазы)
У меня вызвал некое смущение вот этот метод:
Код
internal void ProcessConnection(IConnectionWrapper connection)
		{
			switch (State)
			{
				case PlazaStreamStates.Closed:
					if (ConfigFileName.IsEmpty())
						throw new InvalidOperationException("ConfigFileName is null or empty");

					NativeStream.TableSet = WrapperHelper.CreateTableSet();
					NativeStream.TableSet.Init(ConfigFileName, Table.ReplicationScheme);

					SetRevision();

					State = PlazaStreamStates.Opened;

					Log.SafeInvoke(this, "Поток открыт.");
					break;
				case PlazaStreamStates.Opened:
					if (NativeStream.State == StreamWrapperStates.Error || NativeStream.State == StreamWrapperStates.Closed)
					{
						Log.SafeInvoke(this, "Состояние потока {0}.".Put(NativeStream.State));

						if (NativeStream.State == StreamWrapperStates.Error)
							NativeStream.Close();

						SetRevision();

						NativeStream.Open(connection);
						ReConnected.SafeInvoke();

						Log.SafeInvoke(this, "Поток переоткрыт.");
					}
					break;
				default:
					throw new ArgumentOutOfRangeException();
			}
		}


Во первых я считаю что его надо разбить на несколько независимых методов.
А во вторых с точки зрения логики для меня он непонятен.
В первой ветке case PlazaStreamStates.Closed: в лог пишется что поток открыт и статус у потока меняется на Opened, но на самом деле это не так. Поток еще не открыт. Происходит только загрузка схемы для потока.
Такая же ситуации ветке case PlazaStreamStates.Opened: В лог пишется "поток переоткрыт" , хотя на самом деле это только его первое открытие(за исключением его реальных переоткрытиях в случае необходимости).



1) на какие методы и зачем? разбивать имеет смысл если этот код может использоваться в других местах. Здесь так?
2) Вас смущает надписать "поток открыт"? ну считайте это "поток открывается"
3) здесь поток переоткрывается, т.к. его статус либо Error, либо Closed

Вы куда-то не туда смотрите. То что сейчас не работает - смена сессий. Прошла неделя с последней нашей беседы на эту тему, вы как-то продвинулись?
Автор топика
Спасибо:

frontman

Фотография
Дата: 16.12.2011
Ответить


Александр я как раз туда смотрю.
И уже много раз вам говорил что вот такое считайте, как бы понятно для вас , но не для тех кто этот код не писал. Все должно быть однозначно.
Загрузка схемы - это еще не подключение. Открытие потока и переоткрытие то же разное....

Насчет смены сесии: скажу честно вопрос пока что отложен, т.к. у меня возникли проблемы с прокачкой данных, что мешает работе робота за которого мне собственно зп платят)) Так что пока что извините...
Спасибо:

frontman

Фотография
Дата: 16.12.2011
Ответить


Но это не значит что я отказываюсь от этого вопроса!
Спасибо:

Alexander

Фотография
Дата: 16.12.2011
Ответить


frontman Перейти
Александр я как раз туда смотрю.
И уже много раз вам говорил что вот такое считайте, как бы понятно для вас , но не для тех кто этот код не писал. Все должно быть однозначно.
Загрузка схемы - это еще не подключение. Открытие потока и переоткрытие то же разное....


Вопросы я задал.
Если хотите от меня реакцию получить - на вопросы надо ответить :)
Автор топика
Спасибо:

frontman

Фотография
Дата: 16.12.2011
Ответить


Alexander Mukhanchikov Перейти

1) на какие методы и зачем? разбивать имеет смысл если этот код может использоваться в других местах. Здесь так?

На методы Initialization, Open, ReOpen.
Код разбивают не только в случае его исп в других местах, а еще и с целью логического упрощения.
Alexander Mukhanchikov Перейти

2) Вас смущает надписать "поток открыт"? ну считайте это "поток открывается"

Он не открывается, а инициализируеться схема. Поток при этом не обязательно открывать.
Alexander Mukhanchikov Перейти

Вы куда-то не туда смотрите. То что сейчас не работает - смена сессий. Прошла неделя с последней нашей беседы на эту тему, вы как-то продвинулись?

Ну собственно я ответил что нет.
Спасибо:
< 1 2 

Добавить файлы через драг-н-дроп, , или вставить из буфера обмена.

loading
clippy