Results 1 to 10 of 10

Hybrid View

  1. #1
    Member
    Join Date
    Mar 2008
    Location
    Hồ Chí Minh
    Posts
    7
    May be possible to this time is spent in "subscribe" method of my Remote Adapter. because when I subscribe about 20 - 30 item, It OK. but if I subscribe more 40 or 50 item I get this warning.
    Each item I create one thread to reply data.
    My code for Remote Adapter, just edit your stockdemo:

    using System;
    using System.Collections;
    using System.Threading;

    using Lightstreamer.Interfaces.Data;
    using Lightstreamer.Adapters.Data.HaSTC;

    namespace Lightstreamer.Adapters.Data
    {
    public interface IExternalFeedListener
    {
    /// <summary>
    ///
    /// </summary>
    /// <param name="itemName"></param>
    /// <param name="currentValues"></param>
    /// <param name="isSnapshot"></param>
    void OnEvent(string itemName, IDictionary currentValues, bool isSnapshot);
    }
    public class StocksAdapter:IDataProvider, IExternalFeedListener
    {
    private IItemEventListener _listener;
    private IDictionary _subscribedItems;
    private FileDataFeeder _myfeeder;

    public StocksAdapter()
    {
    _subscribedItems = new Hashtable();
    _myfeeder = new FileDataFeeder(@"\\192.168.4.18\BroadcastServiceXm l2");
    }

    //-------------IDataProvider Install-----------------
    public void Init(IDictionary parameters, string configFile)
    {
    _myfeeder.setFeederListener(this);
    _myfeeder.Start();
    _myfeeder.WatcherStart();
    }
    public void SetListener(IItemEventListener listener)
    {
    _listener = listener;
    }
    public void Subscribe(string itemName)
    {
    if (!itemName.StartsWith("item"))
    throw new SubscriptionException("Unexpected item: " + itemName);
    lock (_subscribedItems)
    {
    if (_subscribedItems.Contains(itemName)) return;
    _subscribedItems[itemName] = false;
    }

    //SubscribeStockInfo sub = new SubscribeStockInfo(itemName,null);
    //sub.setListener(this);
    //sub.Start();

    _myfeeder.SendCurrentValues(itemName);
    }
    public void Unsubscribe(string itemName)
    {
    if (!itemName.StartsWith("item"))
    throw new SubscriptionException("Unexpected item: " + itemName);

    lock (_subscribedItems)
    {
    _subscribedItems.Remove(itemName);
    }
    }
    public bool IsSnapshotAvailable(string itemName)
    {
    if (!itemName.StartsWith("item"))
    throw new SubscriptionException("Unexpected item: " + itemName);
    return true;
    }

    //--------IExternalFeedListener Install----

    public void OnEvent(string itemName,
    IDictionary currentValues,
    bool isSnapshot)
    {
    lock (_subscribedItems)
    {
    if (!_subscribedItems.Contains(itemName)) return;

    bool started = (bool)_subscribedItems[itemName];
    if (!started)
    {
    if (!isSnapshot)
    return;
    _subscribedItems[itemName] = true;
    }
    else
    {
    if (isSnapshot)
    {
    isSnapshot = false;
    }
    }
    _listener.Update(itemName, currentValues, isSnapshot);
    }
    }
    }
    }


    And My FileDataFeeder class

    using System;
    using System.Collections;
    using System.IO;
    using System.Threading;

    namespace Lightstreamer.Adapters.Data.HaSTC
    {
    class FileDataFeeder
    {
    private IExternalFeedListener _listener;
    private FileSystemWatcher watcher;
    private Thread _thdListen;
    private StockInfoColection _stockCollection;
    private TopPriceCollection _topPriceCollection;
    private MarketInfo _marketInfo;
    private IDictionary _stockGenerators;
    private IList _snapshotQueue;
    private string _destDrirectory = "HaSTC";

    public FileDataFeeder(string directory)
    {
    watcher = new FileSystemWatcher(directory,"*.XML");
    watcher.NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.LastAccess;
    watcher.Changed += new FileSystemEventHandler(OnFileChanged);

    _stockGenerators = new Hashtable();
    _snapshotQueue = new ArrayList();

    _stockCollection = new StockInfoColection();
    _topPriceCollection = new TopPriceCollection();
    _marketInfo = new MarketInfo();
    }
    /// <summary>
    /// Set event listener for data feeder.
    /// </summary>
    /// <param name="eventListener"></param>
    public void setFeederListener(IExternalFeedListener eventListener)
    {
    _listener = eventListener;
    foreach (SubscribeStockInfo subscribe in _stockGenerators.Values)
    subscribe.setListener(_listener);
    }
    /// <summary>
    /// Changed event of file watcher
    /// </summary>
    /// <param name="sender"></param>
    /// <param name="e"></param>
    private void OnFileChanged(object sender, FileSystemEventArgs e)
    {
    switch (e.Name)
    {
    case "STS_StocksInfo.XML":
    {
    //File.Copy(e.FullPath, _destDrirectory+@"\"+e.Name,true );
    _stockCollection.ReadData(e.FullPath);
    foreach(SubscribeStockInfo _info in _stockGenerators.Values)
    {
    //if (_info.ItemName == "item1")
    // Console.WriteLine("Data Feeder ACB MatchPrice:{0}",_info.Stock.MatchPrice);
    if (_info.ItemName != "item0")
    {

    for (int j = 0; j < _stockCollection.Stocks.Count; j++)
    {
    StockInfo sinfo = (StockInfo)_stockCollection.Stocks[j];
    if (_info.Stock.StockNo == sinfo.StockNo)
    {
    _info.Stock = sinfo;
    break;
    }
    }
    }
    }
    break;
    }
    case "STS_TOP3_PRICE_A.XML":
    {
    //File.Copy(e.FullPath, _destDrirectory + @"\" + e.Name, true);
    _topPriceCollection.ReadData(e.FullPath);
    foreach (SubscribeStockInfo _info in _stockGenerators.Values)
    {
    if (_info.ItemName != "item0")
    _info.Topprice = _info.Stock.GetTopPrice(_topPriceCollection);
    }
    break;
    }
    case "STS_MarketInfo.XML":
    {
    //File.Copy(e.FullPath, _destDrirectory + @"\" + e.Name, true);
    _marketInfo.ReadData(e.FullPath);
    foreach (SubscribeStockInfo _info in _stockGenerators.Values)
    {
    if (_info.ItemName == "item0")
    {
    _info.Marketinfo = _marketInfo;
    break;
    }
    }
    break;
    }
    }
    }
    /// <summary>
    /// Start watcher file system
    /// </summary>
    public void WatcherStart()
    {
    if (!watcher.EnableRaisingEvents)
    watcher.EnableRaisingEvents = true;
    }

    /// <summary>
    /// Start thread listen a new client
    /// </summary>
    public void Start()
    {
    _stockCollection.ReadData(watcher.Path + @"\STS_StocksInfo.XML");
    _topPriceCollection.ReadData(watcher.Path + @"\STS_TOP3_PRICE_A.XML");
    _marketInfo.ReadData(watcher.Path + @"\STS_MarketInfo.XML");
    if (_thdListen != null) return;

    StockInfo mstock = (StockInfo)_stockCollection.Stocks[0];
    SubscribeStockInfo _market = new SubscribeStockInfo("item0", new StockInfo(), new TopPriceInfo() , _marketInfo);
    _stockGenerators["item0"] = _market;
    _market.setListener(_listener);
    _market.Start();

    for(int i=0; i < _stockCollection.Stocks.Count; i++)
    {
    string itemName = "item"+(i+1);
    StockInfo stock = (StockInfo)_stockCollection.Stocks[i];
    SubscribeStockInfo _subscibeinfo = new SubscribeStockInfo(itemName,stock ,stock.GetTopPrice(_topPriceCollection), _marketInfo);
    _stockGenerators[itemName] = _subscibeinfo;
    _subscibeinfo.setListener(_listener);
    _subscibeinfo.Start();
    }
    _thdListen = new Thread(new ThreadStart(Run));
    _thdListen.Start();
    }



    private void Run()
    {
    IList snapshots = new ArrayList();
    do
    {
    lock (_snapshotQueue)
    {
    if (_snapshotQueue.Count == 0)
    Monitor.Wait(_snapshotQueue);
    snapshots.Clear();
    while (_snapshotQueue.Count > 0)
    {
    SubscribeStockInfo mysubscribe = (SubscribeStockInfo)_snapshotQueue[0];
    snapshots.Add(mysubscribe);
    _snapshotQueue.RemoveAt(0);
    }
    foreach(SubscribeStockInfo _subscride in snapshots)
    _listener.OnEvent(_subscride.ItemName, _subscride.getCurrentValue(true), true);
    }
    } while (true);
    }

    /// <summary>
    /// Forces sending an event with a full snapshot for a stock.
    /// </summary>
    public void SendCurrentValues(string itemName)
    {
    SubscribeStockInfo myProducer = (SubscribeStockInfo)_stockGenerators[itemName];
    if (myProducer == null) return;

    lock (_snapshotQueue)
    {
    _snapshotQueue.Add(myProducer);
    Monitor.Pulse(_snapshotQueue);
    }
    }
    }
    }


    and SubscribeStockInfo class.
    using System;
    using System.Collections;
    using System.Threading;

    namespace Lightstreamer.Adapters.Data.HaSTC
    {
    class SubscribeStockInfo
    {

    private string _itemName;
    private StockInfo _stock;
    private TopPriceInfo _topprice;
    private MarketInfo _marketinfo;
    private Thread _thd;
    private IExternalFeedListener _listener;


    public string ItemName
    {
    get { return _itemName; }
    }
    public StockInfo Stock
    {
    get { return _stock; }
    set { _stock = value; }
    }
    public TopPriceInfo Topprice
    {
    get { return _topprice; }
    set { _topprice = value; }
    }
    public MarketInfo Marketinfo
    {
    get { return _marketinfo; }
    set { _marketinfo = value; }
    }
    public SubscribeStockInfo(string itemname)
    {
    _itemName = itemname;
    }
    public SubscribeStockInfo(string itemname, StockInfo info, TopPriceInfo topprice, MarketInfo market)
    {
    _itemName = itemname;
    _stock = info;
    _topprice = topprice;
    _marketinfo = market;
    }

    public void Start()
    {
    lock (this)
    {
    if (_thd != null) return;
    _thd = new Thread(new ThreadStart(Run));
    _thd.Start();
    }
    }
    public void setListener(IExternalFeedListener eventListener)
    {
    lock(this)
    {
    _listener = eventListener;
    }
    }
    private void Run()
    {
    //get new value from watcher
    do
    {
    if (_listener != null)
    _listener.OnEvent(_itemName, getCurrentValue(false), false);
    Thread.Sleep(500);
    } while (true);
    }
    public IDictionary getCurrentValue(bool isFull)
    {
    lock (this)
    {
    IDictionary eventItem = new Hashtable();
    Random rnd = new Random();
    if (_itemName.Equals("item0"))
    {
    eventItem["symbol"] = rnd.Next(100).ToString();
    eventItem["ref_price"] = rnd.Next(100).ToString();
    eventItem["flr_price"] = rnd.Next(100).ToString();
    eventItem["ceil_price"] = rnd.Next(100).ToString();
    eventItem["room"] = rnd.Next(100).ToString();
    eventItem["foreign_buy"] = rnd.Next(100).ToString();
    eventItem["foreign_sell"] = rnd.Next(100).ToString();
    eventItem["highest"] = rnd.Next(100).ToString();
    eventItem["lowest"] = rnd.Next(100).ToString();
    eventItem["everage"] = rnd.Next(100).ToString();
    eventItem["total_volume"] = rnd.Next(100).ToString();
    eventItem["match_volume"] = rnd.Next(100).ToString();
    return eventItem;
    }

    eventItem["symbol"] = "A";// _stock.Symbol;
    AddPriceField("ref_price",rnd.Next(100) , eventItem);
    AddPriceField("flr_price", rnd.Next(100), eventItem);
    AddPriceField("ceil_price", rnd.Next(100), eventItem);
    AddVolumeField("room", rnd.Next(100), eventItem);
    AddVolumeField("foreign_buy", rnd.Next(100), eventItem);
    AddVolumeField("foreign_sell", rnd.Next(100), eventItem);
    AddPriceField("highest", rnd.Next(100), eventItem);
    AddPriceField("lowest", rnd.Next(100), eventItem);
    AddPriceField("everage", rnd.Next(100), eventItem);
    AddVolumeField("total_volume", rnd.Next(100), eventItem);
    AddVolumeField("match_volume", rnd.Next(100), eventItem);
    AddPriceField("match_price", rnd.Next(100), eventItem);
    AddPriceField("changed", rnd.Next(100), eventItem);
    AddPriceField("bid3", rnd.Next(1000), eventItem);
    AddVolumeField("bidvol3", rnd.Next(100), eventItem);
    AddPriceField("bid2", rnd.Next(1000), eventItem);
    AddVolumeField("bidvol2", rnd.Next(100), eventItem);
    AddPriceField("bid1", rnd.Next(100), eventItem);
    AddVolumeField("bidvol1", rnd.Next(100), eventItem);
    AddPriceField("off1", rnd.Next(100), eventItem);
    AddVolumeField("offvol1", rnd.Next(100), eventItem);
    AddPriceField("off2", rnd.Next(100), eventItem);
    AddVolumeField("offvol2", rnd.Next(100), eventItem);
    AddPriceField("off3", rnd.Next(100), eventItem);
    AddVolumeField("offvol3", rnd.Next(100), eventItem);
    return eventItem;

    }
    }
    public IDictionary getCurrentValue_backup(bool isFull)
    {
    lock (this)
    {
    IDictionary eventItem = new Hashtable();
    if (_itemName.Equals("item0"))
    {
    eventItem["symbol"] = _marketinfo.Index.ToString().Trim();
    eventItem["ref_price"] = _marketinfo.Changed.ToString().Trim();
    eventItem["flr_price"] = _marketinfo.Pct_Changed.ToString().Trim();
    eventItem["ceil_price"] = _marketinfo.Total_Volume.ToString().Trim();
    eventItem["room"] = _marketinfo.Total_Value.ToString().Trim();
    eventItem["foreign_buy"] = _marketinfo.TradingDate.ToString().Trim();
    eventItem["foreign_sell"] = _marketinfo.Time.Trim();
    eventItem["highest"] = _marketinfo.DateNo.ToString().Trim();
    eventItem["lowest"] = _marketinfo.Status.ToString().Trim();
    eventItem["everage"] = _marketinfo.Advances.ToString().Trim();
    eventItem["total_volume"] = _marketinfo.Declines.ToString().Trim();
    eventItem["match_volume"] = _marketinfo.NoChange.ToString();
    return eventItem;
    }
    if (isFull)
    {
    eventItem["symbol"] = _stock.Symbol;
    AddPriceField("ref_price", _stock.RefPrice, eventItem);
    AddPriceField("flr_price", _stock.FlrPrice, eventItem);
    AddPriceField("ceil_price", _stock.CeilPrice, eventItem);
    }
    AddVolumeField("room", _stock.Currentroom, eventItem);
    AddVolumeField("foreign_buy", _stock.Foreign_buy, eventItem);
    AddVolumeField("foreign_sell", _stock.Foreign_sell, eventItem);
    AddPriceField("highest", _stock.Highest, eventItem);
    AddPriceField("lowest", _stock.Lowest, eventItem);
    if (_marketinfo.Status == 13 || _marketinfo.Status == 15)
    AddPriceField("everage", _stock.Average, eventItem);
    AddVolumeField("total_volume", _stock.Total_volume, eventItem);
    AddVolumeField("match_volume", _stock.MatchVolume, eventItem);
    if (_stock.MatchPrice != 0)
    {
    AddPriceField("match_price", _stock.MatchPrice, eventItem);
    AddPriceField("changed", _stock.MatchPrice - _stock.RefPrice, eventItem);
    }
    if (_topprice != null)
    {
    AddPriceField("bid3", _topprice.Third_best_bid_price, eventItem);
    AddVolumeField("bidvol3", _topprice.Third_best_bid_volume, eventItem);
    AddPriceField("bid2", _topprice.Second_best_bid_price, eventItem);
    AddVolumeField("bidvol2", _topprice.Second_best_bid_volume, eventItem);
    AddPriceField("bid1", _topprice.First_best_bid_price, eventItem);
    AddVolumeField("bidvol1", _topprice.First_best_bid_volume, eventItem);

    AddPriceField("off1", _topprice.First_best_offer_price, eventItem);
    AddVolumeField("offvol1", _topprice.First_best_offer_volume, eventItem);
    AddPriceField("off2", _topprice.Second_best_offer_price, eventItem);
    AddVolumeField("offvol2", _topprice.Second_best_offer_volume, eventItem);
    AddPriceField("off3", _topprice.Third_best_offer_price, eventItem);
    AddVolumeField("offvol3", _topprice.Third_best_offer_volume, eventItem);
    }
    return eventItem;
    }
    }
    private static void AddPriceField(string fld, int val100, IDictionary target)
    {
    double v = (((double)val100) / 1000);
    if (v != 0)
    target[fld] = v.ToString().Replace(',', '.');
    }
    private static void AddVolumeField(string fld, int val100, IDictionary target)
    {
    double v = (((double)val100) / 100);
    if (v != 0)
    target[fld] = v.ToString();
    }
    private static void AddVolumeField(string fld, double val100, IDictionary target)
    {
    double v = (((double)val100) / 100);
    if (v != 0)
    target[fld] = v.ToString();
    }
    }
    }

  2. #2
    Administrator
    Join Date
    Jul 2006
    Location
    Milan
    Posts
    1,091
    At first sight, I can't find any blocking operation that could affect "subscribe", so I can't understand why those "subscribe" operations are so slow.
    Do you see a steady 100% CPU level on the machine during that period?
    Please, try to replicate the case and produce a thread dump (if possible) on the Remote Server just before the timeout notifications appear on the Proxy Adapter, to see what happens.

  3. #3
    Member
    Join Date
    Mar 2008
    Location
    Hồ Chí Minh
    Posts
    7
    Hi Dario
    Yes. CPU 100% level on the the machine during this period.
    I think, may be I use many threads in process, hence I get broplem in "block" between threads
    I try many case but I can't understand.

  4. #4
    Administrator
    Join Date
    Jul 2006
    Location
    Milan
    Posts
    1,091
    You should not assume that our example adapters are scalable. We used one thread per item just to make the code simple.

  5. #5
    Member
    Join Date
    Mar 2008
    Location
    Hồ Chí Minh
    Posts
    7
    thanks Dario

 

 

Similar Threads

  1. Replies: 1
    Last Post: April 17th, 2012, 09:19 AM
  2. warn connections, lightstreamer not run
    By quanghung221 in forum Adapter SDKs
    Replies: 1
    Last Post: May 31st, 2010, 09:00 AM
  3. < WARN> Unexpected null event ???
    By mohamida in forum Adapter SDKs
    Replies: 2
    Last Post: December 10th, 2009, 06:57 AM
  4. WARN: Updated discarded for item ?
    By craigtype3 in forum Adapter SDKs
    Replies: 4
    Last Post: August 7th, 2009, 05:47 PM
  5. subscription came to late
    By michaelvb in forum Adapter SDKs
    Replies: 1
    Last Post: May 29th, 2008, 10:07 AM

Bookmarks

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
All times are GMT +1. The time now is 07:07 AM.