他比System.Net.Sockets好的地方在於有提供事件。
範例程式載點
Barakbbn.Freeware.Net.Sockets | 複製程式碼(Copy to clipboard) |
using System;
using System.Net;
using System.Net.Sockets;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Threading;
//////////////////////////使用方法///////////////////////////////////////////////////////////////////////////////
//
// 在程式碼上方加入 using Barakbbn.Freeware.Net.Sockets;
//
// private TcpClient socketControl; //宣告
// private TcpListener socketControl; //宣告
//
// 使用範例請參考TCP_Socket19438810272005.zip 這個是原始版本沒做修改
//
//
// 原作:Barakbbn
//
// 修改記錄
// 2008/08/18
// TcpClient及TcpListener底下新增 string Key
// 用來方便陣列化後的事件來源判別
//
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
namespace Barakbbn.Freeware.Net.Sockets
{
public class SocketErrorEventArgs : EventArgs
{
Exception ex;
public SocketErrorEventArgs(Exception ex)
{
this.ex = ex;
}
public bool IsSocketException
{
get { return (this.ex is SocketException); }
}
public Exception Error
{
get { return this.ex; }
}
}
public class SocketDisconnectedEventArgs : EventArgs
{
public enum DisconnectReason { LocalDisconnect, RemoteDisconnected, ConnectionLost }
DisconnectReason reason;
public SocketDisconnectedEventArgs(DisconnectReason reason)
{
this.reason = reason;
}
public DisconnectReason Reason
{
get { return this.reason; }
}
}
public class SocketDataArrivedEventArgs : EventArgs
{
private int recvSize;
private int lostSize;
public SocketDataArrivedEventArgs(int recvSize, int lostSize)
{
this.recvSize = recvSize;
this.lostSize = lostSize;
}
public int ReceivedSize
{
get { return this.recvSize; }
}
public int LostSize
{
get { return this.lostSize; }
}
}
public class SocketDataSendProgressChangedEventArgs : EventArgs
{
private int sentSize;
private int totalSize;
public SocketDataSendProgressChangedEventArgs(int sentSize, int totalSize)
{
this.sentSize = sentSize;
this.totalSize = totalSize;
}
public int SentSize
{
get { return this.sentSize; }
internal protected set { this.sentSize = value; }
}
public int TotalSize
{
get { return this.totalSize; }
internal protected set { this.totalSize = value; }
}
}
public class TcpClient : System.Net.Sockets.TcpClient
{
public string Key;
private delegate int BeginReceiveDelegate(ref byte[] buffer, int offset, int size, bool allSizeOrNothing);
private delegate void BeginDisconnectDelegate();
private delegate int SendWithProgressDelegate(byte[] buffer, int offset, int size, SocketFlags socketFlags);
public class SendProgressConstraints
{
private int minMillisecondsBetweenEvents;
private int chunkSize;
public SendProgressConstraints(int minMilliseconds, int chunkSize)
{
this.minMillisecondsBetweenEvents = minMilliseconds;
this.chunkSize = chunkSize;
}
public int MinMillisecondsBetweenEvents
{
get
{
lock (this)
{
return this.minMillisecondsBetweenEvents;
}
}
set
{
lock (this)
{
if (value < 0) throw new ArgumentOutOfRangeException("MinMillisecondsBetweenEvents", "Value can't be negative");
this.minMillisecondsBetweenEvents = value;
}
}
}
public int ChunkSize
{
get
{
lock (this)
{
return this.chunkSize;
}
}
set
{
lock (this)
{
if (value < 0) throw new ArgumentOutOfRangeException("ChunkSize", "Value can't be negative");
this.chunkSize = value;
}
}
}
}
#region Member Fields
protected const int recvBufferMinimumSize = 8 * 1024;
private bool disposed;
private Barakbbn.Freeware.Collections.CyclicQueue<Byte> recvBuffer;
private AutoResetEvent dataAvailableEvent;
private int recvTimeout;
private int dataAvailableMinimumCount;
private Exception socketAsyncRecvError;
private Thread socketPumpThread;
private UInt32 keepAliveInterval;
private DateTime userLastRecvTime;
private SendProgressConstraints sendProgressAttributes;
private TcpClientStream ownerStream;
#endregion
#region Events
public virtual event EventHandler DataAvailable;
public virtual event EventHandler<SocketDataArrivedEventArgs> DataArrived;
public virtual event EventHandler<SocketDataArrivedEventArgs> DataLost;
new public virtual event EventHandler Connected;
public virtual event EventHandler<SocketDisconnectedEventArgs> Disconnected;
public virtual event EventHandler<SocketDataSendProgressChangedEventArgs> SendProgressChanged;
public virtual event EventHandler SendCompleted;
public virtual event EventHandler<SocketErrorEventArgs> Error;
#endregion
#region Lifetime
public TcpClient()
: base()
{
InitInstance();
}
public TcpClient(AddressFamily family)
: base(family)
{
InitInstance();
}
public TcpClient(IPEndPoint localEP)
: base(localEP)
{
InitInstance();
}
public TcpClient(string hostname, int port)
: base(hostname, port)
{
InitInstance();
}
private void InitInstance()
{
this.recvBuffer = new Barakbbn.Freeware.Collections.CyclicQueue<Byte>(Math.Max(base.ReceiveBufferSize, TcpClient.recvBufferMinimumSize));
this.disposed = false;
this.dataAvailableEvent = new AutoResetEvent(false);
dataAvailableMinimumCount = 1; //1 Byte
this.recvTimeout = base.ReceiveTimeout;
//base.ReceiveTimeout = this.recvTimeout = 5000;
this.socketAsyncRecvError = null;
this.userLastRecvTime = DateTime.Now;
this.ownerStream = null;
//Set keep alive for 10 seconds
this.KeepAliveInterval = 10000;
this.sendProgressAttributes = new SendProgressConstraints(100, 8 * 1024);
}
override protected void Dispose(bool disposing)
{
if (!this.disposed)
{
// If disposing equals true, dispose all managed and unmanaged resources.
if (disposing)
{
// Dispose managed resources.
StopSocketPumpThread();
bool isConnectd = this.IsConnected;
ReleaseSocket();
this.dataAvailableEvent.Close();
this.dataAvailableEvent = null;
if (isConnectd) OnDisconnected(new SocketDisconnectedEventArgs(SocketDisconnectedEventArgs.DisconnectReason.LocalDisconnect));
((IDisposable)this.recvBuffer).Dispose();
}
// Release unmanaged resources. (If disposing is false, only the following code is executed.)
this.disposed = true;
}
base.Dispose(disposing);
}
#endregion
#region Events Raising
protected virtual void OnDataAvailable(EventArgs e)
{
if (this.DataAvailable != null) this.DataAvailable(this, e);
}
protected virtual void OnDataArrived(SocketDataArrivedEventArgs e)
{
this.OnDataArrived(e, false);
}
protected virtual void OnDataArrived(SocketDataArrivedEventArgs e, bool async)
{
if (this.DataArrived == null) return;
if (async)
this.DataArrived.BeginInvoke(this, e, null, null);
else
this.DataArrived(this, e);
}
protected virtual void OnDataLost(SocketDataArrivedEventArgs e)
{
if (this.DataLost != null) this.DataLost(this, e);
}
protected virtual void OnConnected(EventArgs e)
{
if (this.Connected != null) this.Connected(this, e);
}
protected virtual void OnDisconnected(SocketDisconnectedEventArgs e)
{
if (this.Disconnected != null) this.Disconnected(this, e);
}
protected virtual void OnSendProgressChanged(SocketDataSendProgressChangedEventArgs e)
{
if (this.SendProgressChanged != null) this.SendProgressChanged(this, e);
}
protected virtual void OnSendCompleted(EventArgs e)
{
if (this.SendCompleted != null) this.SendCompleted(this, e);
}
protected virtual void OnError(SocketErrorEventArgs e)
{
this.OnError(e, false);
}
protected virtual void OnError(SocketErrorEventArgs e, bool async)
{
if (this.Error == null) return;
if (async)
this.Error.BeginInvoke(this, e, null, null);
else
this.Error(this, e);
}
#endregion
#region Properties
protected new virtual bool Active
{
get { return base.Active; }
set { base.Active = value; }
}
#region Public Properties
public new virtual int Available
{
get
{
lock (this.recvBuffer)
{
return this.recvBuffer.Count;
}
}
}
public new virtual Socket Client
{
get { return base.Client; }
set
{
if (base.Client == value) return;
StopSocketPumpThread();
ReleaseSocket();
base.Client = value;
if (value.Connected)
{
this.Active = true;
StartSocketPumpThread();
}
}
}
public virtual bool IsConnected
{
get { return base.Connected; }
}
public new virtual bool ExclusiveAddressUse
{
get { return base.ExclusiveAddressUse; }
set { base.ExclusiveAddressUse = value; }
}
public new virtual LingerOption LingerState
{
get { return base.LingerState; }
set { base.LingerState = value; }
}
public new virtual bool NoDelay
{
get { return base.NoDelay; }
set { base.NoDelay = value; }
}
public new virtual int ReceiveBufferSize
{
get { return this.Client.ReceiveBufferSize; }
set
{
lock (this.recvBuffer)
{
this.Client.ReceiveBufferSize = value;
}
}
}
public new virtual int ReceiveTimeout
{
get { return this.recvTimeout; }
set { this.recvTimeout = value; }
}
public new virtual int SendBufferSize
{
get { return base.SendBufferSize; }
set { base.SendBufferSize = value; }
}
public new virtual int SendTimeout
{
get { return base.SendTimeout; }
set { base.SendTimeout = value; }
}
public virtual UInt32 KeepAliveInterval
{
get
{
return this.keepAliveInterval;
}
set
{
byte[] inBuffer = new byte[12];
if (value == 0)
{
BitConverter.GetBytes((UInt32)0).CopyTo(inBuffer, 0);
BitConverter.GetBytes((UInt32)75000).CopyTo(inBuffer, 4);
BitConverter.GetBytes((UInt32)7200000).CopyTo(inBuffer, 8);
}
else
{
BitConverter.GetBytes((UInt32)1).CopyTo(inBuffer, 0);
BitConverter.GetBytes(value).CopyTo(inBuffer, 4); //Interval
BitConverter.GetBytes((UInt32)200).CopyTo(inBuffer, 8); //keepalive mechanism will kick in xxx ms after last send/receive or connected
}
this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, value != 0);
this.Client.IOControl(IOControlCode.KeepAliveValues, inBuffer, null);
this.keepAliveInterval = value;
}
}
public virtual IPEndPoint LocalEndPoint
{
get { return this.Client.LocalEndPoint as IPEndPoint; }
}
public virtual IPEndPoint RemoteEndPoint
{
get { return this.Client.RemoteEndPoint as IPEndPoint; }
}
public SendProgressConstraints SendProgressAttributes
{
get { return this.sendProgressAttributes; }
}
#endregion
#endregion
#region Methods
private void StartSocketPumpThread()
{
this.socketPumpThread = new System.Threading.Thread(new System.Threading.ThreadStart(this.SocketPumpThreadProc));
this.socketPumpThread.IsBackground = true;
this.socketPumpThread.Start();
}
private void StopSocketPumpThread()
{
if (this.socketPumpThread != null && this.socketPumpThread.IsAlive)
{
this.socketPumpThread.Abort();
}
this.socketPumpThread = null;
}
private void SocketPumpThreadProc()
{
int recvCount, beforeRecvCount, lostCount, idleCount = 0, errCount = 0;
ArraySegment<byte>[] recvBuffers;
bool becomeAvailable = false;
ResetReceiveBuffer();
//---------------------------------------------
while (this.IsConnected)
{
#region try
try
{
if (!this.Client.Poll(7000000, SelectMode.SelectRead))
{
//Do some maintenance on the recvBuffer
idleCount = (DoMaintenanceOnRecvIdle(idleCount * 7000, (int)Math.Min((long)(DateTime.Now - this.userLastRecvTime).TotalMilliseconds, (long)int.MaxValue))) ? 0 : idleCount + 1;
continue;
}
//-------------------------------------
//increase receive buffer if needed
errCount = idleCount = lostCount = recvCount = 0; //Reset some variables
lock (this.recvBuffer)
{
ResizeRecvBuffer(true); //Increase internal recvieve buffer if needed
beforeRecvCount = this.recvBuffer.Count; //Remeber current size of available data
recvBuffers = this.recvBuffer.AcquireInternalBufferForWriting(false); //Get receive buffer internal buffer, locked for writing
try
{
recvCount = this.Client.Receive(recvBuffers);
}
finally
{
if (this.recvBuffer.ReleaseAcquiredInternalBuffer(recvCount))
{
lostCount = recvCount + beforeRecvCount - this.recvBuffer.Count;
}
}
}
if (recvCount == 0)
{
InternalDisconnect(SocketDisconnectedEventArgs.DisconnectReason.RemoteDisconnected);
}
else if (this.IsConnected && recvCount > 0)
{
becomeAvailable = (beforeRecvCount == 0);
lock (this.dataAvailableEvent)
{
if (this.dataAvailableMinimumCount <= recvCount + beforeRecvCount) this.dataAvailableEvent.Set();
}
if (becomeAvailable) OnDataAvailable(EventArgs.Empty);
HandleArrivedDataEvents(recvCount, lostCount);
}
}
#endregion
//-------------------------------------------------------
#region catch
catch (System.Threading.ThreadInterruptedException ex)
{
continue;
}
catch (SocketException ex)
{
switch (ex.SocketErrorCode)
{
case SocketError.TimedOut:
break;
case SocketError.ConnectionReset:
InternalDisconnect(SocketDisconnectedEventArgs.DisconnectReason.ConnectionLost); //Will also stop this thread
break;
default:
if (errCount > 0) throw;
errCount++;
this.socketAsyncRecvError = ex;
OnError(new SocketErrorEventArgs(ex), true);
break;
}
}
catch (ThreadAbortException ex)
{
//Terminating
}
catch (Exception ex)
{
throw;
}
#endregion
}
}
private void ResetReceiveBuffer()
{
lock (this.recvBuffer)
{
this.recvBuffer.Clear();
this.recvBuffer.Capacity = TcpClient.recvBufferMinimumSize;
this.dataAvailableEvent.Reset();
}
}
private bool ResizeRecvBuffer(bool increase)
{
if (increase)
{
if (this.recvBuffer.Capacity < this.ReceiveBufferSize && base.Available > this.recvBuffer.FreeSpace)
{
this.recvBuffer.Capacity = (int)Math.Min((long)this.ReceiveBufferSize, 2 * (long)this.recvBuffer.Capacity);
return true;
}
}
else
{
if (this.recvBuffer.Capacity / 2 >= this.recvBuffer.Count && this.recvBuffer.Capacity > TcpClient.recvBufferMinimumSize)
{
this.recvBuffer.Capacity = Math.Max(this.recvBuffer.Count, TcpClient.recvBufferMinimumSize);
return true;
}
}
return false;
}
private bool DoMaintenanceOnRecvIdle(int socketRecvIdleMilliseconds, int userRecvIdleMilliseconds)
{
if (Math.Min(socketRecvIdleMilliseconds, userRecvIdleMilliseconds) > 20000)
{
return ResizeRecvBuffer(false);
}
return false;
}
private void HandleArrivedDataEvents(int dataSize, int lostDataSize)
{
if (this.DataLost == null && this.DataArrived == null) return;
SocketDataArrivedEventArgs e = new SocketDataArrivedEventArgs(dataSize, lostDataSize);
if (lostDataSize > 0) OnDataLost(e);
OnDataArrived(e, true);
}
private void ReleaseSocket()
{
//Using base instead of this to access the wrapped socket, is used to avoid another call to ReleaseSocket
if (base.Client != null)
{
if (base.Client.Connected) this.Client.Shutdown(SocketShutdown.Both);
base.Client.Close();
base.Client = null;
base.Active = false;
}
}
private void RecreateSocket()
{
Socket newSocket = CloneSocket(base.Client);
//Using 'base' instead of 'this' to access the wrapped socket, is used to avoid another call to ReleaseSocket
ReleaseSocket();
base.Active = false; //indicates to the base TcpClient that it's not connected
base.Client = newSocket;
this.KeepAliveInterval = this.keepAliveInterval;
}
private Socket CloneSocket(Socket sock)
{
Socket newSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
newSocket.Blocking = sock.Blocking;
newSocket.DontFragment = sock.DontFragment;
newSocket.ExclusiveAddressUse = sock.ExclusiveAddressUse;
newSocket.LingerState.Enabled = sock.LingerState.Enabled;
newSocket.LingerState.LingerTime = sock.LingerState.LingerTime;
newSocket.NoDelay = sock.NoDelay;
newSocket.ReceiveBufferSize = sock.ReceiveBufferSize;
newSocket.ReceiveTimeout = sock.ReceiveTimeout;
newSocket.SendBufferSize = sock.SendBufferSize;
newSocket.SendTimeout = sock.SendTimeout;
newSocket.Ttl = sock.Ttl;
newSocket.UseOnlyOverlappedIO = sock.UseOnlyOverlappedIO;
return newSocket;
}
private void SetRecvDataAvailableMinimumSize(int size)
{
lock (this.dataAvailableEvent)
{
this.dataAvailableMinimumCount = size;
if (this.Available < this.dataAvailableMinimumCount)
{
this.dataAvailableEvent.Reset();
}
else
{
this.dataAvailableEvent.Set();
}
}
}
private void InternalDisconnect(SocketDisconnectedEventArgs.DisconnectReason reason)
{
//Methods which wants to prevent the event raising and operation done here (OnDisconnected),
//incase the socket is not connected, should check IsConnected before calling this method
try
{
if (Thread.CurrentThread != this.socketPumpThread) StopSocketPumpThread(); //prevent call from the pump thread to kill itself
RecreateSocket();
OnDisconnected(new SocketDisconnectedEventArgs(reason));
}
catch (SocketException e)
{
this.OnError(new SocketErrorEventArgs(e), true);
throw;
}
}
private int InternalReceive(ref byte[] buffer, int offset, int size, bool allSizeOrNothing)
{
int res = 0;
if (this.socketAsyncRecvError != null)
{
try
{
throw this.socketAsyncRecvError;
}
finally { this.socketAsyncRecvError = null; }
}
try
{
//If disconnected, and not enough data to read, throw exception
if (!this.IsConnected && (this.Available == 0 || allSizeOrNothing && size > this.Available))
{
throw new SocketException((int)SocketError.NotConnected);
}
SetRecvDataAvailableMinimumSize((allSizeOrNothing) ? size : 1);
if (this.dataAvailableEvent.WaitOne(this.recvTimeout, true))
{
lock (this.recvBuffer)
{
if (buffer == null)
{
buffer = this.recvBuffer.Dequeue(size);
res = buffer.Length;
}
else
{
res = this.recvBuffer.Dequeue(buffer, size);
}
this.userLastRecvTime = DateTime.Now;
}
if (this.Available > 0) this.dataAvailableEvent.Set();
return res;
}
throw new SocketException((int)SocketError.TimedOut);
}
catch (SocketException e)
{
this.OnError(new SocketErrorEventArgs(e), true);
return (-1);
//throw;
}
}
protected virtual int SendWithProgress(byte[] buffer, int offset, int size, SocketFlags socketFlags)
{
SocketDataSendProgressChangedEventArgs e = new SocketDataSendProgressChangedEventArgs(0, size);
TimeSpan ellapsedTime;
DateTime time1, time2;
int sendSize;
time1 = DateTime.Now;
while (size > 0)
{
sendSize = Math.Min(this.sendProgressAttributes.ChunkSize, size);
sendSize = this.Client.Send(buffer, offset, sendSize, socketFlags);
offset += sendSize;
size -= sendSize;
e.SentSize += sendSize;
ellapsedTime = (time2 = DateTime.Now) - time1;
if (ellapsedTime.TotalMilliseconds >= this.sendProgressAttributes.MinMillisecondsBetweenEvents || size == 0)
{
OnSendProgressChanged(e);
time1 = time2;
}
}
return e.SentSize;
}
private void PostConnect()
{
StartSocketPumpThread();
OnConnected(EventArgs.Empty);
}
#region Public Methods
public new virtual TcpClientStream GetStream()
{
if (this.ownerStream == null)
{
this.ownerStream = new TcpClientStream(this, true);
}
return this.ownerStream;
}
public new virtual IAsyncResult BeginConnect(IPAddress address, int port, AsyncCallback requestCallback, object state)
{
return base.BeginConnect(address, port, requestCallback, state);
}
public new virtual IAsyncResult BeginConnect(IPAddress[] addresses, int port, AsyncCallback requestCallback, object state)
{
return base.BeginConnect(addresses, port, requestCallback, state);
}
public new virtual IAsyncResult BeginConnect(string host, int port, AsyncCallback requestCallback, object state)
{
return base.BeginConnect(host, port, requestCallback, state);
}
public new virtual void Connect(IPEndPoint remoteEP)
{
try
{
base.Connect(remoteEP);
PostConnect();
}
catch (SocketException e)
{
this.OnError(new SocketErrorEventArgs(e), true);
throw;
}
}
public new virtual void Connect(IPAddress address, int port)
{
try
{
base.Connect(address, port);
PostConnect();
}
catch (SocketException e)
{
this.OnError(new SocketErrorEventArgs(e), true);
throw;
}
}
public new virtual void Connect(IPAddress[] ipAddresses, int port)
{
try
{
base.Connect(ipAddresses, port);
PostConnect();
}
catch (SocketException e)
{
this.OnError(new SocketErrorEventArgs(e), true);
throw;
}
}
public new virtual void Connect(string hostname, int port)
{
try
{
base.Connect(hostname, port);
PostConnect();
}
catch (SocketException e)
{
this.OnError(new SocketErrorEventArgs(e), true);
throw;
}
}
public new virtual void EndConnect(IAsyncResult asyncResult)
{
base.EndConnect(asyncResult);
//this.Client.BeginReceive(this.tempRecvBuffer, 0, this.tempRecvBuffer.Length, SocketFlags.None, new AsyncCallback(this.PumpInSocket), this.tempRecvBuffer);
StartSocketPumpThread();
OnConnected(EventArgs.Empty);
}
public virtual IAsyncResult BeginReceive(AsyncCallback callback, object state)
{
byte[] buffer = null;
return (new BeginReceiveDelegate(this.InternalReceive)).BeginInvoke(ref buffer, 0, int.MaxValue, false, callback, state);
}
public virtual IAsyncResult BeginReceive(byte[] buffer, AsyncCallback callback, object state)
{
return this.BeginReceive(buffer, 0, (buffer == null) ? 0 : buffer.GetLength(0), false, callback, state);
}
public virtual IAsyncResult BeginReceive(byte[] buffer, int offset, int size, AsyncCallback callback, object state)
{
return this.BeginReceive(buffer, offset, size, false, callback, state);
}
public virtual IAsyncResult BeginReceive(byte[] buffer, int offset, int size, bool allSizeOrNothing, AsyncCallback callback, object state)
{
if (buffer == null)
{
throw new ArgumentNullException("buffer");
}
if ((offset < 0) || (offset > buffer.Length))
{
throw new ArgumentOutOfRangeException("offset");
}
if ((size < 0) || (size > (buffer.Length - offset)))
{
throw new ArgumentOutOfRangeException("size");
}
return (new BeginReceiveDelegate(this.InternalReceive)).BeginInvoke(ref buffer, offset, size, allSizeOrNothing, callback, state);
}
public virtual int Receive(byte[] buffer)
{
return this.Receive(buffer, 0, (buffer == null) ? 0 : buffer.GetLength(0), false);
}
public virtual int Receive(byte[] buffer, int offset, int size)
{
return Receive(buffer, offset, size, false);
}
public virtual int Receive(byte[] buffer, int offset, int size, bool allSizeOrNothing)
{
if (buffer == null)
{
throw new ArgumentNullException("buffer");
}
if ((offset < 0) || (offset > buffer.Length))
{
throw new ArgumentOutOfRangeException("offset");
}
if ((size < 0) || (size > (buffer.Length - offset)))
{
throw new ArgumentOutOfRangeException("size");
}
return InternalReceive(ref buffer, offset, size, allSizeOrNothing);
}
public virtual byte[] Receive()
{
byte[] buffer = null;
InternalReceive(ref buffer, 0, int.MaxValue, false);
return buffer;
}
public virtual int EndReceive(IAsyncResult asyncResult)
{
byte[] buffer = null;
System.Runtime.Remoting.Messaging.AsyncResult delegateAsyncResult = asyncResult as System.Runtime.Remoting.Messaging.AsyncResult;
if (delegateAsyncResult == null) throw new ArgumentException("Incorrect type of Async Result");
return ((BeginReceiveDelegate)delegateAsyncResult.AsyncDelegate).EndInvoke(ref buffer, asyncResult);
}
public virtual Byte[] EndReceive(out int count, IAsyncResult asyncResult)
{
byte[] buffer = null;
System.Runtime.Remoting.Messaging.AsyncResult delegateAsyncResult = asyncResult as System.Runtime.Remoting.Messaging.AsyncResult;
if (delegateAsyncResult == null) throw new ArgumentException("Incorrect type of Async Result");
count = ((BeginReceiveDelegate)delegateAsyncResult.AsyncDelegate).EndInvoke(ref buffer, asyncResult);
return buffer;
}
public virtual IAsyncResult BeginSend(byte[] buffer, AsyncCallback callback, object state)
{
if (this.SendProgressChanged != null)
{
return (new SendWithProgressDelegate(this.SendWithProgress)).BeginInvoke(buffer, 0, (buffer != null) ? buffer.GetLength(0) : 0, SocketFlags.None, callback, state);
}
else
{
return this.Client.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, callback, state);
}
}
public virtual IAsyncResult BeginSend(byte[] buffer, int offset, int size, SocketFlags socketFlags, AsyncCallback callback, object state)
{
if (this.SendProgressChanged != null)
{
return (new SendWithProgressDelegate(this.SendWithProgress)).BeginInvoke(buffer, offset, size, socketFlags, callback, state);
}
else
{
return this.Client.BeginSend(buffer, offset, size, socketFlags, callback, state);
}
}
public virtual int Send(byte[] buffer)
{
int res;
try
{
if (this.SendProgressChanged != null)
{
res = SendWithProgress(buffer, 0, (buffer != null) ? buffer.GetLength(0) : 0, SocketFlags.None);
}
else
{
res = this.Client.Send(buffer);
}
OnSendCompleted(EventArgs.Empty);
}
catch (SocketException e)
{
this.OnError(new SocketErrorEventArgs(e), true);
throw;
}
return res;
}
public virtual int Send(byte[] buffer, int offset, int size)
{
int res;
try
{
if (this.SendProgressChanged != null)
{
res = SendWithProgress(buffer, offset, size, SocketFlags.None);
}
else
{
res = this.Client.Send(buffer, offset, size, SocketFlags.None);
}
OnSendCompleted(EventArgs.Empty);
}
catch (SocketException e)
{
this.OnError(new SocketErrorEventArgs(e), true);
throw;
}
return res;
}
public virtual int EndSend(IAsyncResult asyncResult)
{
int res;
if (asyncResult is System.Runtime.Remoting.Messaging.AsyncResult
&& ((System.Runtime.Remoting.Messaging.AsyncResult)asyncResult).AsyncDelegate is SendWithProgressDelegate)
{
res = ((SendWithProgressDelegate)((System.Runtime.Remoting.Messaging.AsyncResult)asyncResult).AsyncDelegate).EndInvoke(asyncResult);
}
else
{
res = this.Client.EndSend(asyncResult);
}
OnSendCompleted(EventArgs.Empty);
return res;
}
public virtual void Disconnect()
{
if (!this.IsConnected) return;
InternalDisconnect(SocketDisconnectedEventArgs.DisconnectReason.LocalDisconnect);
}
public virtual IAsyncResult BeginDisconnect(AsyncCallback callback, object state)
{
return (new BeginDisconnectDelegate(this.Disconnect)).BeginInvoke(callback, state);
}
public virtual void EndDisconnect(IAsyncResult asyncResult)
{
System.Runtime.Remoting.Messaging.AsyncResult delegateAsyncResult = asyncResult as System.Runtime.Remoting.Messaging.AsyncResult;
if (delegateAsyncResult == null) throw new ArgumentException("Incorrect type of Async Result");
((BeginDisconnectDelegate)delegateAsyncResult.AsyncDelegate).EndInvoke(asyncResult);
}
#endregion
#endregion
}
public class TcpListener : IDisposable
{
public string Key;
private bool disposed;
private System.Net.Sockets.Socket socket;
private IPEndPoint localEndPoint;
private bool exclusiveAddressUse;
#region Events
public event EventHandler ConnectionRequest;
#endregion
public TcpListener() : this(0) { }
public TcpListener(int port) : this(IPAddress.Any, port) { }
public TcpListener(IPAddress localaddr, int port) : this(new IPEndPoint(localaddr, port)) { }
public TcpListener(IPEndPoint localEP)
{
InitInstance(localEP);
}
private void InitInstance(IPEndPoint localEP)
{
this.disposed = false;
this.localEndPoint = localEP;
this.socket = new System.Net.Sockets.Socket(localEP.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
this.exclusiveAddressUse = false;
}
virtual protected void Dispose(bool disposing)
{
if (!this.disposed)
{
// If disposing equals true, dispose all managed and unmanaged resources.
if (disposing)
{
// Dispose managed resources.
this.socket.Close();
}
// Release unmanaged resources. (If disposing is false, only the following code is executed.)
this.disposed = true;
}
}
private void MonitorConnectionRequests()
{
ThreadPool.QueueUserWorkItem(new WaitCallback(this.ConnectionRequestsMonitorProc), this.socket);
}
private void ConnectionRequestsMonitorProc(object state)
{
try
{
while (this.IsListening && !this.socket.Poll(7000000, SelectMode.SelectRead)) ;
if (this.IsListening && this.socket == (System.Net.Sockets.Socket)state) OnConnectionRequest(EventArgs.Empty);
}
catch (SocketException ex)
{
}
catch (ThreadInterruptedException ex)
{
}
catch (ThreadAbortException ex)
{
}
catch (Exception ex)
{
}
}
#region Events Raising
protected virtual void OnConnectionRequest(EventArgs e)
{
if (this.ConnectionRequest != null) this.ConnectionRequest(this, e);
}
#endregion
#region Public Properties
virtual public bool ExclusiveAddressUse
{
get { return this.socket.ExclusiveAddressUse; }
set
{
this.socket.ExclusiveAddressUse = value;
this.exclusiveAddressUse = value;
}
}
virtual public IPEndPoint LocalEndPoint
{
get { return this.localEndPoint; }
protected set { this.localEndPoint = value; }
}
virtual public System.Net.Sockets.Socket Server
{
get { return this.socket; }
}
virtual public bool IsListening
{
get { return ((int)this.socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.AcceptConnection) != 0) ? true : false; }
}
virtual public bool Pending
{
get
{
if (!this.IsListening) return false;
return this.socket.Poll(0, SelectMode.SelectRead);
}
}
#endregion
#region Public Methods
virtual public System.Net.Sockets.Socket AcceptSocket()
{
System.Net.Sockets.Socket res = this.socket.Accept();
MonitorConnectionRequests();
return res;
}
virtual public TcpClient AcceptTcpClient()
{
System.Net.Sockets.Socket socket = this.AcceptSocket();
TcpClient res = new TcpClient();
res.Client = socket;
return res;
}
virtual public IAsyncResult BeginAcceptSocket(AsyncCallback callback, object state)
{
return this.socket.BeginAccept(callback, state);
}
virtual public IAsyncResult BeginAcceptTcpClient(AsyncCallback callback, object state)
{
return this.BeginAcceptSocket(callback, state);
}
virtual public System.Net.Sockets.Socket EndAcceptSocket(IAsyncResult asyncResult)
{
System.Net.Sockets.Socket res = this.socket.EndAccept(asyncResult);
MonitorConnectionRequests();
return res;
}
virtual public TcpClient EndAcceptTcpClient(IAsyncResult asyncResult)
{
System.Net.Sockets.Socket socket = this.EndAcceptSocket(asyncResult);
TcpClient res = new TcpClient();
res.Client = socket;
return res;
}
private void InternalStart()
{
if (this.IsListening)
{
if (this.localEndPoint.Equals(this.socket.LocalEndPoint)) return; //Already listening on the requested end point
this.Stop();
}
this.socket.Bind(this.LocalEndPoint);
this.socket.Listen(5);
MonitorConnectionRequests();
}
virtual public void Start()
{
InternalStart();
}
virtual public void Start(int port)
{
this.localEndPoint.Port = port;
InternalStart();
}
virtual public void Start(IPAddress localaddr, int port)
{
this.localEndPoint.Address = localaddr;
this.localEndPoint.Port = port;
InternalStart();
}
virtual public void Start(IPEndPoint localEP)
{
this.localEndPoint = localEP;
InternalStart();
}
virtual public void Stop()
{
this.socket.Close();
this.socket = new System.Net.Sockets.Socket(this.localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
this.socket.ExclusiveAddressUse = this.exclusiveAddressUse;
}
#endregion
#region IDisposable Members
void IDisposable.Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
}
public class TcpClientStream : Stream
{
#region Member Fields
private TcpClient tcpClient;
bool ownsSocket;
bool disposed;
#endregion
#region Lifetime
public TcpClientStream(TcpClient tcpClient)
: this(tcpClient, false)
{
}
public TcpClientStream(TcpClient tcpClient, bool ownsSocket)
{
InitInstance(tcpClient, ownsSocket);
}
private void InitInstance(TcpClient tcpClient, bool ownsSocket)
{
if (tcpClient == null) throw new ArgumentNullException("tcpClient");
this.tcpClient = tcpClient;
this.ownsSocket = ownsSocket;
this.disposed = false;
}
protected override void Dispose(bool disposing)
{
if (!this.disposed)
{
// If disposing equals true, dispose all managed and unmanaged resources.
if (disposing)
{
// Dispose managed resources.
if (this.ownsSocket)
{
this.TcpClient.Close();
}
}
// Release unmanaged resources. (If disposing is false, only the following code is executed.)
this.disposed = true;
}
base.Dispose(disposing);
}
#endregion
#region Public Properties
public virtual TcpClient TcpClient
{
get { return this.tcpClient; }
}
#endregion
#region Public Methods
public virtual IAsyncResult BeginRead(AsyncCallback callback, object state)
{
return this.TcpClient.BeginReceive(callback, state);
}
public virtual IAsyncResult BeginRead(byte[] buffer, AsyncCallback callback, object state)
{
return this.BeginRead(buffer, 0, (buffer == null) ? 0 : buffer.GetLength(0), callback, state);
}
public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, bool allCountOrNothing, AsyncCallback callback, object state)
{
return this.TcpClient.BeginReceive(buffer, offset, count, allCountOrNothing, callback, state);
}
public virtual int Read(byte[] buffer)
{
return this.Read(buffer, 0, (buffer == null) ? 0 : buffer.GetLength(0), false);
}
public virtual int Read(byte[] buffer, int offset, int count, bool allCountOrNothing)
{
try
{
return this.TcpClient.Receive(buffer, offset, count, allCountOrNothing);
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.TimedOut) throw new TimeoutException("Timeout while reading from socket", ex);
throw;
}
}
public virtual byte[] Read()
{
try
{
return this.TcpClient.Receive();
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.TimedOut) throw new TimeoutException("Timeout while reading from socket", ex);
throw;
}
}
public virtual Byte[] EndRead(out int count, IAsyncResult asyncResult)
{
try
{
return this.TcpClient.EndReceive(out count, asyncResult);
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.TimedOut) throw new TimeoutException("Timeout while reading from socket", ex);
throw;
}
}
public virtual IAsyncResult BeginWrite(byte[] buffer, AsyncCallback callback, object state)
{
return this.BeginWrite(buffer, 0, (buffer != null) ? buffer.GetLength(0) : 0, callback, state);
}
public virtual void Write(byte[] buffer)
{
this.Write(buffer, 0, (buffer != null) ? buffer.GetLength(0) : 0);
}
#region Stream overrides
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return true; }
}
public override void Flush()
{
}
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
return this.Read(buffer, offset, count, false);
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
try
{
this.TcpClient.Send(buffer, offset, count);
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.TimedOut) throw new TimeoutException("Timeout while writing to socket", ex);
throw;
}
}
public override bool CanTimeout
{
get { return true; }
}
public override int ReadTimeout
{
get
{
return this.TcpClient.ReceiveTimeout;
}
set
{
this.TcpClient.ReceiveTimeout = value;
}
}
public override int WriteTimeout
{
get
{
return this.TcpClient.SendTimeout;
}
set
{
this.TcpClient.SendTimeout = value;
}
}
#endregion
#endregion
}
} //namespace System.Net.Sockets.Customized
namespace Barakbbn.Freeware.Collections
{
public class CyclicQueue<T> : IDisposable
{
#region Member Fields
private bool disposed;
private T[] data;
private int readIndex, writeIndex;
private bool isFull;
private Mutex dataLock;
#endregion
#region Lifetime
public CyclicQueue(int capacity)
{
InitInstance(capacity);
}
protected virtual void Dispose(bool disposing)
{
if (!this.disposed)
{
// If disposing equals true, dispose all managed and unmanaged resources.
if (disposing)
{
// Dispose managed resources.
this.dataLock.Close();
}
// Release unmanaged resources. (If disposing is false, only the following code is executed.)
this.disposed = true;
}
}
private void InitInstance(int capacity)
{
this.disposed = false;
this.readIndex = this.writeIndex = 0;
this.isFull = false;
this.data = new T[capacity];
this.dataLock = new Mutex(false);
}
#endregion
protected virtual int CalcCyclicIndex(int fromIndex, int offset)
{
int res = (int)((fromIndex + (long)offset) % this.Capacity);
if (res < 0) res += this.Capacity;
return res;
}
//private void AlignQueue()
//{
// RotateQueue(this.readIndex, false);
//}
//private void RotateQueue(int count, bool rightSide)
//{
// count = Math.Abs(count) % this.Capacity;
// if (count == 0) return;
// T[] tmp = new T[Math.Min(count, this.Capacity - count)];
// if (rightSide) count = this.Capacity - count; //adjust to left rotation
// if (count < this.Capacity - count)
// {
// LockInternalQueue();
// try
// {
// Array.Copy(this.data, 0, tmp, 0, count);
// Array.Copy(this.data, count, this.data, 0, this.Capacity - count);
// Array.Copy(tmp, 0, this.data, this.Capacity - count, count);
// }
// finally
// {
// UnlockInternalQueue();
// }
// }
// else
// {
// LockInternalQueue();
// try
// {
// Array.Copy(this.data, count, tmp, 0, tmp.Length);
// Array.Copy(this.data, 0, this.data, tmp.Length, count);
// Array.Copy(tmp, 0, this.data, 0, tmp.Length);
// }
// finally
// {
// UnlockInternalQueue();
// }
// }
// this.readIndex = CalcCyclicIndex(this.readIndex, -count);
// this.writeIndex = CalcCyclicIndex(this.writeIndex, -count);
//}
private ArraySegment<T>[] InternalGetWriteSegments(bool allowOverwrite)
{
ArraySegment<T>[] res = new ArraySegment<T>[2];
if (allowOverwrite)
{
res[0] = new ArraySegment<T>(this.data, this.writeIndex, this.Capacity - this.writeIndex);
res[1] = new ArraySegment<T>(this.data, 0, this.writeIndex);
}
else
{
if (this.IsFull)
{
res[0] = new ArraySegment<T>(this.data, 0, 0);
res[1] = new ArraySegment<T>(this.data, this.writeIndex, 0);
}
else if (this.writeIndex >= this.readIndex)
{
res[0] = new ArraySegment<T>(this.data, this.writeIndex, this.Capacity - this.writeIndex);
res[1] = new ArraySegment<T>(this.data, 0, this.readIndex);
}
else
{
res[0] = new ArraySegment<T>(this.data, this.writeIndex, this.readIndex - this.writeIndex);
res[1] = new ArraySegment<T>(this.data, this.readIndex, 0);
}
}
return res;
}
protected virtual void ResizeBuffer(int newCapacity)
{
if (newCapacity < 0) throw new ArgumentOutOfRangeException("newCapacity");
lock (this)
{
int count = this.Count;
T[] res = new T[newCapacity];
LockInternalQueue();
try
{
if (count > newCapacity) this.Remove((count = count - newCapacity)); //The logic here, is to discard older data, incase new capacity is smaller
this.Dequeue(res, count);
}
finally
{
UnlockInternalQueue();
}
this.IsFull = (count == newCapacity);
this.writeIndex = count;
this.readIndex = 0;
this.data = res;
}
}
protected virtual void LockInternalQueue()
{
if (!this.dataLock.WaitOne(5000, true)) throw new TimeoutException("Previous call to AcquireInternalBufferForWriting wasn't released using ReleaseAcquiredInternalBuffer");
}
protected virtual void UnlockInternalQueue()
{
this.dataLock.ReleaseMutex();
}
#region Public Properties
public virtual int Capacity
{
get
{
lock (this)
{
return this.data.GetLength(0);
}
}
set
{
if (this.data.GetLength(0) != value)
{
ResizeBuffer(value);
}
}
}
public virtual int Count
{
get
{
int res;
lock (this)
{
res = this.writeIndex - this.readIndex;
if (res < 0 || this.IsFull) res += this.Capacity;
}
return res;
}
}
public virtual bool IsFull
{
get
{
lock (this)
{
return this.isFull;
}
}
protected set { this.isFull = value; }
}
public virtual bool IsEmpty
{
get
{
lock (this)
{
return (this.readIndex == this.writeIndex && !this.isFull);
}
}
}
public virtual int FreeSpace
{
get
{
int res;
lock (this)
{
res = this.readIndex - this.writeIndex;
if (res <= 0 && !this.IsFull) res += this.Capacity;
}
return res;
}
}
#endregion
#region Public Methods
public virtual bool Enqueue(T item)
{
return this.Enqueue(new T[] { item });
}
public virtual bool Enqueue(T[] items)
{
return this.Enqueue(items, (items != null) ? items.GetLength(0) : 0);
}
public virtual bool Enqueue(T[] items, int count) //return: data overwritten?
{
if (count <= 0) return false;
int newFreeSpace;
bool res, isFull;
lock (this)
{
if (items.GetLength(0) < count) count = items.GetLength(0);
newFreeSpace = this.FreeSpace - count;
res = (newFreeSpace < 0); //Is going to overwrite data
isFull = (newFreeSpace <= 0); //Is going to be full
if (count >= this.Capacity)
{
LockInternalQueue();
try
{
Array.Copy(items, count - this.Capacity, this.data, 0, this.Capacity);
}
finally
{
UnlockInternalQueue();
}
this.writeIndex = this.readIndex = 0;
this.IsFull = isFull;
return res;
}
if (count <= this.Capacity - this.writeIndex)
{
LockInternalQueue();
try
{
Array.Copy(items, 0, this.data, this.writeIndex, count);
}
finally
{
UnlockInternalQueue();
}
}
else
{
int size = this.Capacity - this.writeIndex;
LockInternalQueue();
try
{
Array.Copy(items, 0, this.data, this.writeIndex, size);
Array.Copy(items, size, this.data, 0, count - size);
}
finally
{
UnlockInternalQueue();
}
}
this.writeIndex = CalcCyclicIndex(this.writeIndex, count);
if (isFull) this.IsFull = true;
if (res) this.readIndex = this.writeIndex;//Data lost
}
return res;
}
public virtual T Dequeue() { return default(T); }
public virtual T[] Dequeue(int count)
{
if (count < 0) throw new ArgumentOutOfRangeException("count", "Value must be positive");
if (count > this.Count) count = this.Count;
T[] res = new T[count];
this.Dequeue(res, count);
return res;
}
public virtual int Dequeue(T[] items)
{
return this.Dequeue(items, (items != null) ? items.GetLength(0) : 0);
}
public virtual int Dequeue(T[] items, int count)
{
lock (this)
{
count = this.Peek(items, count);
if (count > 0) this.Remove(count);
}
return count;
}
public virtual T Peek()
{
lock (this)
{
if (this.Count == 0) throw new InvalidOperationException("Queue is empty");
return this.Peek(1)[0];
}
}
public virtual T[] Peek(int count)
{
if (count < 0) throw new ArgumentOutOfRangeException("count", "Value must be positive");
if (count > this.Count) count = this.Count;
T[] res = new T[count];
this.Peek(res, count);
return res;
}
public virtual int Peek(T[] items)
{
return this.Peek(items, (items != null) ? items.GetLength(0) : 0);
}
public virtual int Peek(T[] items, int count)
{
if (items == null) throw new ArgumentNullException("items");
if (items.GetLength(0) < count) throw new ArgumentOutOfRangeException("count");
lock (this)
{
if (count > this.Count) count = this.Count;
if (this.Capacity - this.readIndex >= count)
{
LockInternalQueue();
try
{
Array.Copy(this.data, this.readIndex, items, 0, count);
}
finally
{
UnlockInternalQueue();
}
}
else
{
int size = this.Capacity - this.readIndex;
LockInternalQueue();
try
{
Array.Copy(this.data, this.readIndex, items, 0, size);
Array.Copy(this.data, 0, items, size, count - size);
}
finally
{
UnlockInternalQueue();
}
}
}
return count;
}
public virtual void Clear()
{
lock (this)
{
this.writeIndex = this.readIndex = 0;
this.IsFull = false;
}
}
public virtual void Remove(int count)
{
if (count < 0) throw new ArgumentOutOfRangeException("count");
if (count == 0) return;
lock (this)
{
if (count >= this.Count)
{
this.readIndex = this.writeIndex = 0;
}
else
{
this.readIndex = CalcCyclicIndex(this.readIndex, count);
}
this.IsFull = false;
}
}
public virtual ArraySegment<T>[] AcquireInternalBufferForWriting(bool onlyFreeSpace)
{
ArraySegment<T>[] res = null;
lock (this)
{
LockInternalQueue();
try
{
res = InternalGetWriteSegments(!onlyFreeSpace);
}
catch (Exception ex)
{
UnlockInternalQueue();
}
}
return res;
}
public virtual bool ReleaseAcquiredInternalBuffer(int itemsWrittenCount)
{
lock (this)
{
int newFreeSpace;
bool res;
UnlockInternalQueue();
newFreeSpace = this.FreeSpace - itemsWrittenCount; //calc new free space before new writeIndex
this.writeIndex = CalcCyclicIndex(this.writeIndex, itemsWrittenCount);
if (newFreeSpace <= 0) this.IsFull = true;
res = (newFreeSpace < 0);
if (res) this.readIndex = this.writeIndex;//Data lost
return res;
}
}
#endregion
#region IDisposable Members
void IDisposable.Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
}
}
5 則留言:
請問範例程式載點好像已經不見了!?
在這邊可以找到http://www.phate.tw/thread-5996-1-2.html
看到了感謝Kelp幫忙^__^
抱歉再度打擾,http://www.phate.tw/thread-5996-1-2.html 提到的範例是使用Socket,而Barakbbn.Freeware.Net.Sockets是採用Tcpclient,因為我看程式註解開頭有寫TCP_Socket19438810272005.zip,所以想問一下這個範例檔案,來學習如何使用KeepAliveValues來控制Timeout重連機器。
Phate.Free.Sockets 是 Barakbbn.Freeware.Net.Sockets 的修正版,Barakbbn.Freeware.Net.Sockets 存在不少的問題,如果想要下載的話可以問Google看看。
C#的Socket物件分成Tcpclient跟Tcpserver,Tcpclient是Socket底下的一部分而已。
張貼留言