Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSONWebSocket仕様の新ニコ実況に対応 #9

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
126 changes: 97 additions & 29 deletions TVTComment/Model/NiconicoUtils/NicoLiveCommentReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Http;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text;
Expand Down Expand Up @@ -57,6 +59,39 @@ public NicoLiveCommentReceiver(NiconicoLoginSession niconicoLoginSession)
httpClient.DefaultRequestHeaders.TryAddWithoutValidation("User-Agent", ua);
}

/// <summary>
/// KeepAliveコマンドの送信
/// </summary>
/// <exception cref="ObjectDisposedException"></exception>
/// <exception cref="NetworkNicoLiveCommentReceiverException"></exception>
private async void SendBlankAliveMessage(ClientWebSocket ws, [EnumeratorCancellation] CancellationToken cancellationToken)
{
if (ws == null || !WebSocketState.Open.Equals(ws.State))
{
Debug.WriteLine("websocket client is in wrong state.");
return;
}
while (true)
{
try
{
await Task.Delay(60 * 1000, cancellationToken); // 1分待ちます。
await ws.SendAsync(Encoding.UTF8.GetBytes(""), WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false); //0byteデータ送信
}
catch (Exception e) when (e is ObjectDisposedException || e is SocketException || e is IOException || e is TaskCanceledException)
{
if (cancellationToken.IsCancellationRequested)
return;
if (e is TaskCanceledException)
return;
if (e is ObjectDisposedException)
throw;
else
throw new NetworkNicoLiveCommentReceiverException(e);
}
}
}

/// <summary>
/// 受信した<see cref="NiconicoCommentXmlTag"/>を無限非同期イテレータで返す
/// </summary>
Expand All @@ -73,6 +108,10 @@ public async IAsyncEnumerable<NiconicoCommentXmlTag> Receive(string liveId, [Enu

for (int disconnectedCount = 0; disconnectedCount < 5; ++disconnectedCount)
{
// 万が一接続中断した場合、数秒空いたからリトライする。
var random = new Random();
await Task.Delay((disconnectedCount * 5000) + random.Next(0, 101));

Stream str;
try
{
Expand All @@ -96,26 +135,32 @@ public async IAsyncEnumerable<NiconicoCommentXmlTag> Receive(string liveId, [Enu
throw new InvalidPlayerStatusNicoLiveCommentReceiverException("現在放送されていないか、コミュニティ限定配信のためコメント取得できませんでした");

var threadId = playerStatusRoot.GetProperty("data").GetProperty("rooms")[0].GetProperty("threadId").GetString();
var msUriStr = playerStatusRoot.GetProperty("data").GetProperty("rooms")[0].GetProperty("xmlSocketUri").GetString();
var msUriStr = playerStatusRoot.GetProperty("data").GetProperty("rooms")[0].GetProperty("webSocketUri").GetString();
if (threadId == null || msUriStr == null)
{
throw new InvalidPlayerStatusNicoLiveCommentReceiverException(str.ToString());
}
var msUri = new Uri(msUriStr);
using var tcpClinet = new TcpClient(msUri.Host, msUri.Port);
var socketStream = tcpClinet.GetStream();
using var socketReader = new StreamReader(socketStream, Encoding.UTF8);

using var __ = cancellationToken.Register(() =>
{
socketReader.Dispose(); // socketReader.ReadAsyncを強制終了
});

string body = $"<thread res_from=\"-10\" version=\"20061206\" thread=\"{threadId}\" scores=\"1\" />\0";
// WebSocketAPIに接続
ClientWebSocket ws = new ClientWebSocket();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RequestHeaderにUAを付けるのと、サブプロトコルに「msg.nicovideo.jp#json」を指定した方が良さそうです。

// UAヘッダ追加
var assembly = Assembly.GetExecutingAssembly().GetName();
string version = assembly.Version.ToString(3);
ws.Options.SetRequestHeader("User-Agent", $"TvtComment/{version}");
// SubProtocol追加
ws.Options.AddSubProtocol(WEBSOCKET_PROTOCOL);
// Sec-WebSocket-Versionヘッダ追加
ws.Options.SetRequestHeader("Sec-WebSocket-Extensions", WEBSOCKET_EXTENSIONS);

var uri = new Uri(msUriStr);
await ws.ConnectAsync(uri, cancellationToken);
var buffer = new byte[1024];

// threadId情報を送信
string body = "[{\"ping\":{\"content\":\"rs:0\"}},{\"ping\":{\"content\":\"ps:0\"}},{\"thread\":{\"thread\":\"" + threadId + "\",\"version\":\"20061206\",\"user_id\":\"guest\",\"res_from\":-10,\"with_global\":1,\"scores\":1,\"nicoru\":0}},{\"ping\":{\"content\":\"pf:0\"}},{\"ping\":{\"content\":\"rf:0\"}}]";
byte[] bodyEncoded = Encoding.UTF8.GetBytes(body);
try
{
await socketStream.WriteAsync(bodyEncoded, 0, bodyEncoded.Length, cancellationToken).ConfigureAwait(false);
await ws.SendAsync(bodyEncoded, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
}
catch (Exception e) when (e is ObjectDisposedException || e is SocketException || e is IOException)
{
Expand All @@ -127,28 +172,49 @@ public async IAsyncEnumerable<NiconicoCommentXmlTag> Receive(string liveId, [Enu
throw new NetworkNicoLiveCommentReceiverException(e);
}

//コメント受信ループ
// 1分間毎に0byteのKeepAliveコマンドを送信。
SendBlankAliveMessage(ws, cancellationToken);

//情報取得待ちループ
while (true)
{
char[] buf = new char[2048];
int receivedByte;
try
var segment = new ArraySegment<byte>(buffer);
var result = await ws.ReceiveAsync(segment, cancellationToken);

//エンドポイントCloseの場合、処理を中断
if (result.MessageType == WebSocketMessageType.Close)
{
receivedByte = await socketReader.ReadAsync(buf, 0, buf.Length).ConfigureAwait(false);
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "OK",
cancellationToken);
break;
}
catch (Exception e) when (e is ObjectDisposedException || e is SocketException || e is IOException)

//バイナリの場合は、当処理では扱えないため、処理を中断
if (result.MessageType == WebSocketMessageType.Binary)
{
await ws.CloseAsync(WebSocketCloseStatus.InvalidMessageType,
"Binary not supported.", cancellationToken);
break;
}

int count = result.Count;
while (!result.EndOfMessage)
{
if (cancellationToken.IsCancellationRequested)
throw new OperationCanceledException(null, e, cancellationToken);
if (e is ObjectDisposedException)
throw;
else
throw new NetworkNicoLiveCommentReceiverException(e);
if (count >= buffer.Length)
{
await ws.CloseAsync(WebSocketCloseStatus.InvalidPayloadData,
"That's too long", cancellationToken);
throw new ConnectionClosedNicoLiveCommentReceiverException();
}
segment = new ArraySegment<byte>(buffer, count, buffer.Length - count);
result = await ws.ReceiveAsync(segment, cancellationToken);

count += result.Count;
}
if (receivedByte == 0)
break; // 4時リセットかもしれない→もう一度試す

this.parser.Push(new string(buf[..receivedByte]));
//メッセージを取得
var message = Encoding.UTF8.GetString(buffer, 0, count);
this.parser.Push(message);
while (this.parser.DataAvailable())
yield return this.parser.Pop();
}
Expand All @@ -162,6 +228,8 @@ public void Dispose()
}

private readonly HttpClient httpClient;
private readonly NiconicoCommentXmlParser parser = new NiconicoCommentXmlParser(true);
private readonly NiconicoCommentJsonParser parser = new NiconicoCommentJsonParser(true);
private readonly string WEBSOCKET_PROTOCOL = "msg.nicovideo.jp#json";
private readonly string WEBSOCKET_EXTENSIONS = "permessage-deflate; client_max_window_bits";
}
}
79 changes: 79 additions & 0 deletions TVTComment/Model/NiconicoUtils/NiconicoCommentJsonParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using Newtonsoft.Json.Linq;
using System.Collections.Generic;

namespace TVTComment.Model.NiconicoUtils
{
class NiconicoCommentJsonParser
{
private bool socketFormat;
private Queue<NiconicoCommentXmlTag> chats = new Queue<NiconicoCommentXmlTag>();
private string buffer;

/// <summary>
/// <see cref="NiconicoCommentJsonParser"/>を初期化する
/// </summary>
/// <param name="socketFormat">ソケットを使うリアルタイムのデータ形式ならtrue 過去ログなどのデータ形式ならfalse</param>
public NiconicoCommentJsonParser(bool socketFormat)
{
this.socketFormat = socketFormat;
}

public void Push(string str)
{
if (socketFormat)
{
// 一旦、コメント関連データのみ解析する
if (str.StartsWith("{\"chat"))
{
chats.Enqueue(getChatJSONTag(str));
}
}
else
{
// サポートしない,
}
}

/// <summary>
/// 解析結果を返す <see cref="socketFormat"/>がfalseなら<see cref="ChatNiconicoCommentXmlTag"/>しか返さない
/// </summary>
/// <returns>解析結果の<see cref="NiconicoCommentXmlTag"/></returns>
public NiconicoCommentXmlTag Pop()
{
return chats.Dequeue();
}

/// <summary>
/// <see cref="Pop"/>で読みだすデータがあるか
/// </summary>
public bool DataAvailable()
{
return chats.Count > 0;
}

public void Reset()
{
buffer = string.Empty;
chats.Clear();
}

private static ChatNiconicoCommentXmlTag getChatJSONTag(string str) {
JObject jsonObj = JObject.Parse(str);

int vpos = jsonObj["chat"]["vpos"] == null ? 0 : int.Parse(jsonObj["chat"]["vpos"].ToString()); //ニコ生側の不具合で稀に必須項目のvposが抜けてるデータが流れてくる可能性があるので念の為JSONキー確認する。
long date = long.Parse(jsonObj["chat"]["date"].ToString());
int dateUsec = jsonObj["chat"]["date_usec"] == null ? 0 : int.Parse(jsonObj["chat"]["date_usec"].ToString());
string mail = jsonObj["chat"]["mail"] == null ? "" : jsonObj["chat"]["mail"].ToString();
string userId = jsonObj["chat"]["user_id"].ToString();
int premium = jsonObj["chat"]["premium"] == null ? 0 : int.Parse(jsonObj["chat"]["premium"].ToString());
int anonymity = jsonObj["chat"]["anonymity"] == null ? 0 : int.Parse(jsonObj["chat"]["anonymity"].ToString());
int abone = jsonObj["chat"]["abone"] == null ? 0 : int.Parse(jsonObj["chat"]["abone"].ToString());
string content = (string)jsonObj["chat"]["content"];
int no = int.Parse(jsonObj["chat"]["no"].ToString());

return new ChatNiconicoCommentXmlTag(
content, 0, no, vpos, date, dateUsec, mail, userId, premium, anonymity, abone
);
}
}
}
1 change: 1 addition & 0 deletions TVTComment/TVTComment.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<PackageReference Include="AttachedCommandBehavior" Version="2.0.0" />
<PackageReference Include="CommonServiceLocator" Version="1.3.0" />
<PackageReference Include="Microsoft.Xml.SgmlReader" Version="1.8.18" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Prism.Core" Version="6.3.0-pre1" />
<PackageReference Include="Prism.Unity" Version="6.3.0-pre1" />
<PackageReference Include="Prism.Wpf" Version="6.3.0-pre1" />
Expand Down