C#によるgRPC通信の双方向ストリーミングのサンプルコード

「C#によるgRPC通信のサンプルコード」に戻る

RPCサービスの双方向ストリーミング・メソッドのサンプルを作成した。

(注意:2024年1月15日更新→.NET5.0はサポート終了しました。サンプルコードは.NET8.0に更新しました)

機能はクライアントからアップロードしたバイナリファイルのデータを、バイト単位でビット反転して、クライアントにそのまま送り返してくるというものだ。返されたデータは別のファイルに保存する。

これまでの gRPC のサンプルソースに双方向ストリーミング・メソッドを追加した。

以下がサンプルコードへのGitHubリンクになる。

gRPCサービス側

Release v1.7.1.0 · motoi-tsushima/Sample_GrpcService
.NET 8.0 により、リビルドしました。

WPFクライアント側

Release v1.7.1.0 · motoi-tsushima/Sample_gRPC_WpfApp
.NET 8.0 により、リビルドしました。

前回も前々回も紹介したが、テスト用に自由にファイルサイズを指定してテスト用のバイナリファイルを作成するツールを作成した。

以下がそのツールのソースと実行ファイルである。

Release v1.1.0 · motoi-tsushima/dummyfile
.NET 8.0 でリビルドしました。

コンソールで、

dummyfile testfile.bin /s:500M /d

と入力すると「testfile.bin」というダミーファイルを作成する。

/s はファイルサイズを指定し、/d はファイル内部にダミーデータを書き込む事を指定する。

ダミーデータは、0x01から0xFFまでの数値を繰り返し書き込む。

全て .NET Core6.0 で稼働する。

サンプルコードの機能と使い方

今回、追加した機能はこれまでと同じように既存のサンプルに追加して作成している。

gRPCサービス側を起動しておき、WPFクライアント側を起動して操作する。

画面全体は以下のようになる。

その内、今回追加したのが以下の部分だ。

この機能は、ユーザーが選択したファイルを gRPCサービスへアップロードし、サービス側でそのデータをバイト単位でビット反転して、そのままクライアントへ送り返す。

クライアント側で受け取ったデータは「ダウンロードファイル名」に示されたファイルへ保存する。

送信するデータ量と、折り返し受信するデータ量が同じになる仕様なので、ダウンロードするファイルサイズは、アップロードするファイルサイズと同じになる。

「ダウンロードファイル名設定」ボタンでダウンロードファイル名を選択する。

デフォルトではファイル名だけ設定しているが、そのままではパスの指定が行われていないので、「ダウンロードファイル名設定」ボタンでパスの指定をしてから実行して欲しい。

ダウンロードファイルのパスを指定したら、「ファイルアップロード」ボタンでアップロード対象となるファイルを選択する。

ダウンロードファイルとは別のファイル名にして欲しい。

実行する前に dummyfile コマンドでテスト用ファイルを作成しておいてください。

それをアップロードします。

この機能はサンプルコードとして最も単純な処理になるように、送信と受信のデータ量を同じにしているが、実用では送信と受信は互いに独立しているので、送信と受信のデータ量を同じにする必要はない。

タイミングも送信と受信で同期を取る必要はない。

互いにバラバラに動作するようにプログラミングする事もできる。

スレッドはそれぞれ独立しているからだ。

gRPCサービスの解説

追加したコードの解説をする。

[greet.proto]の編集

[greet.proto]へ追加したのは以下のコードになる。


service Greeter {
  //Bidirectional Streaming
  rpc BidirectionalStream(stream BidirectionalStreamRequest) returns (stream BidirectionalStreamResponse);
}

双方向ストリーミング・メソッドは引数にも、返り値にも「stream」を宣言する。

追加した「message」は以下になる。


//Bidirectional Streaming Request
message BidirectionalStreamRequest{
	string request = 1;
	bytes binary = 2;
	int64  binarySize = 3;
}

//Bidirectional Streaming Response
message BidirectionalStreamResponse{
	string result = 1;
	bytes binary = 2;
	int64  binarySize = 3;
}

こちらの宣言の仕方は単項メソッドと変わらない。

BidirectionalStreamRequest と BidirectionalStreamResponse の「bytes binary」はアップロードするファイルを分割してダウンロードするためのパケットの格納容器になる。

双方向ストリーミングでは stream BidirectionalStreamRequest を繰り返しサーバーへ送信する事で、大量のデータを送信し、

同時にサーバーから stream BidirectionalStreamResponse を繰り返しクライアントへ送信する事で、同様に大量のデータを受信する。

どちらも繋ぎっぱなしの継続的回線である。

実際の処理も互いにマルチスレッドで分かれて、それぞれループ処理になっている。

[GreeterService.cs]への編集

rpc BidirectionalStream に対応するメソッドは以下になる。


/// <summary>
/// 双方向ストリーミング メソッド
/// </summary>
/// <param name="requestStream">リクエスト ストリーム</param>
/// <param name="responseStream">レスポンス ストリーム</param>
/// <param name="context"></param>
/// <returns></returns>
public override async Task BidirectionalStream(
    IAsyncStreamReader<BidirectionalStreamRequest> requestStream,
    IServerStreamWriter<BidirectionalStreamResponse> responseStream, 
    ServerCallContext context)
{
    // リクエストストリームを受信するループ。
    await foreach (var message in requestStream.ReadAllAsync())
    {
        //終了確認
        string requestString = message.Request;
        if(requestString == "end")
        {
            break;
        }

        // 受信したパケットのデータをバイト単位でビット反転する。
        byte[] bin = message.Binary.ToArray<byte>();
        long binSize = message.BinarySize;

        byte[] sendBin = new byte[binSize];

        for(long i = 0; i < binSize; i++)
        {
            sendBin[i] = (byte)(bin[i] ^ (byte)0xFF);
        }

        // レスポンスを返す。
        BidirectionalStreamResponse response = new BidirectionalStreamResponse();
        response.Binary = Google.Protobuf.ByteString.CopyFrom(sendBin);
        response.BinarySize = binSize;
        response.Result = "success";

        await responseStream.WriteAsync(response);
    }
}

双方向ストリーミングは、クライアントストリーミングとサーバーストリーミングを同時に行っている。

クライアントストリーミングは、

IAsyncStreamReader<BidirectionalStreamRequest> requestStream

からパケットを受け取る処理になる。

サーバーストリーミングは、

IServerStreamWriter<BidirectionalStreamResponse> responseStream

へパケットを送信する処理になる。

それぞれの中核処理は await を宣言して非同期で実行している。

マルチスレッドで、クライアントストリーミングとサーバーストリーミングを、それぞれ独立したスレッドで実施できる事を意味する。

今回は、クライアントストリーミング処理である

await foreach (var message in requestStream.ReadAllAsync())

のループの中で、サーバーストリーミング処理の

 await responseStream.WriteAsync(response);

を呼び出しているので、両者のタイミングが一致しているが、Task.Run(ラムダ式) などを使用して、サーバーストリーミング処理をクライアントストリーミング処理のループの外で実行する事も可能である。

やり方はかなり自由だ。

但し「非同期プログラミング」の初歩的知識は必須となる。初歩で良く、深く知る必要はない。

async , await, Task が分からない人は勉強した方が良い

ラムダ式の知識も必要になる場合がある。

クライアントストリーミングの解説でも出てきたが、 requestStream.ReadAllAsync() は、クライアントからのストリーミングを非同期に一括で受け取りコレクションで保持する。

それを foreach のループで取得する。

一括と言ってもバックグラウンドで常駐して受信を待っている。

プログラム的にはコレクションとして扱えると言うことだ。

「foreach (var message in requestStream.ReadAllAsync())」で受け取ったリクエストストリーム(クライアントストリーミング)を、ループの中で、

byte[] bin = message.Binary.ToArray<byte>();
long binSize = message.BinarySize;

のように .NET型に変換して、bin 配列の値をバイト単位でビット反転する。


        byte[] sendBin = new byte[binSize];

        for(long i = 0; i < binSize; i++)
        {
            sendBin[i] = (byte)(bin[i] ^ (byte)0xFF);
        }

あとは、ビット反転した sendBin を BidirectionalStreamResponse クラスに格納して、クライアントへ送信する。


        // レスポンスを返す。
        BidirectionalStreamResponse response = new BidirectionalStreamResponse();
        response.Binary = Google.Protobuf.ByteString.CopyFrom(sendBin);
        response.BinarySize = binSize;
        response.Result = "success";

        await responseStream.WriteAsync(response);

これはサーバーストリーミングである。

繰り返すが、このサーバーストリーミング処理は、必ずしもクライアントストリーミングと同時に行う必要はないし、このループの中で実行しなくても良い。

サーバー側の処理は以上です。

WPFクライアント側のコード解説

この画面のXAMLは以下のようになる。

[MainWindow.xaml]


<StackPanel Orientation="Vertical" Grid.Row="14" Height="50" VerticalAlignment="Top"  Background="Violet">
    <StackPanel Orientation="Horizontal" Height="25" VerticalAlignment="Top"  Background="Violet">
        <TextBlock Text="双方向ストリーミング" Width="170" TextAlignment="Left" VerticalAlignment="Top" Margin="10,0,20,0" FontSize="18" />
        <Button x:Name="BidiUploadButton" Content="ファイル アップロード" Width="100" Margin="30,0,0,0" VerticalAlignment="Center" Click="BidiUploadButton_Click"/>
        <Button x:Name="CancelBidiButton" Content="キャンセル" Width="100" Margin="30,0,0,0" Click="CancelBidiButton_Click" IsEnabled="False" VerticalAlignment="Center"/>
        <TextBlock x:Name="BidiMessage" Text="" Width="550" Margin="10,0,0,0" TextAlignment="Left" VerticalAlignment="Center"/>
    </StackPanel>
    <StackPanel Orientation="Horizontal" Height="25" VerticalAlignment="Top"  Background="Violet">
        <TextBlock Margin="30,0,0,0" Text="ダウンロードファイル名" Width="125" TextAlignment="Left" VerticalAlignment="Center" />
        <TextBox x:Name="BidiDownloadTextBox" Width="300" TextAlignment="Left" Text="download.bin" VerticalAlignment="Center" />
        <Button x:Name="BidiDownloadButton" Content="ダウンロードファイル名設定" Width="150" Margin="30,0,0,0" VerticalAlignment="Center" Click="BidiDownloadButton_Click"/>
        <TextBlock x:Name="BidiDownloadMessage" Text="" Width="550" Margin="10,0,0,0" TextAlignment="Left" VerticalAlignment="Center"/>
    </StackPanel>
</StackPanel>

「ダウンロードファイル名」の BidiDownloadTextBox にサーバーストリーミングで送り返されてくるビット反転したデータの保存先ファイル名を表示している。

「ダウンロードファイル名設定」BidiDownloadButton ボタンで、「ダウンロードファイル名」のフルパスを設定する。

「ファイル アップロード」BidiUploadButton ボタンが、送信ファイルをダイアログで選択し、双方向ストリーミング処理を実行するメイン処理となる。

「キャンセル」CancelBidiButton ボタンで、双方向ストリーミング処理を中断する。

BidiMessage に処理状況を表示する。

メッセージボックスの代りである。

次にコードビハインドの追加コードを以下に示す。

[MainWindow.xaml.cs]


/// <summary>
/// 双方向ストリーミング実行
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void BidiUploadButton_Click(object sender, RoutedEventArgs e)
{
    AsyncDuplexStreamingCall<BidirectionalStreamRequest, BidirectionalStreamResponse> _callBidiStream;
    string filePath;
    string fileName;
    const int BufferSize = 10240;
    byte[] bin = new byte[BufferSize];

    this._bidiCanceled = false;

    OpenFileDialog openFileDialog = new OpenFileDialog();

    var dlg = openFileDialog.ShowDialog();
    if (dlg == false)
    {
        return;
    }

    filePath = openFileDialog.FileName;
    fileName = System.IO.Path.GetFileName(filePath);

    this.BidiUploadButton.IsEnabled = false;
    this.CancelBidiButton.IsEnabled = true;
    this.BidiDownloadButton.IsEnabled = false;
    this.BidiMessage.Text = fileName + " アップロード中";

    // gRPC メッセージ 宣言
    BidirectionalStreamRequest bidiRequest = new BidirectionalStreamRequest();
    bidiRequest.Request = "normal";
    bidiRequest.BinarySize = BufferSize;

    // gRPC サービスを呼び出す。
    _callBidiStream = this.grpcClient.GreeterClient.BidirectionalStream();

    // 非同期レスポンス受信とファイル出力
    string writeFileName = this.BidiDownloadTextBox.Text;
    string responseMessage = "";

    var readTask = Task.Run(async () =>
    {
        FileStream wfs = null;

        try
        {
            await foreach (var message in _callBidiStream.ResponseStream.ReadAllAsync())
            {
                if (this._bidiCanceled)
                {
                    break;
                }

                if (wfs == null)
                {
                    wfs = new FileStream(writeFileName, FileMode.Create, FileAccess.Write);
                }

                byte[] wbin = message.Binary.ToByteArray();

                wfs.Write(wbin, 0, (int)message.BinarySize);
            }
        }
        catch(Exception ex)
        {
            responseMessage = ex.Message;
        }
        finally
        {
            if (wfs != null)
            {
                wfs.Close();
                wfs.Dispose();
            }
        }
    });

    //ファイルアップロードと非同期リクエストストリーム
    using (var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read))
    {
        int sendSize = 0;
        int readSize = 0;

        while ((readSize = fs.Read(bin, 0, BufferSize)) > 0)
        {
            if (this._bidiCanceled || readTask.IsCompleted || readTask.IsCanceled)
            {
                break;
            }

            bidiRequest.Binary = Google.Protobuf.ByteString.CopyFrom(bin);

            await _callBidiStream.RequestStream.WriteAsync(bidiRequest);
            await Task.Delay(TimeSpan.FromMilliseconds(10));

            this.BidiMessage.Text = fileName + " 双方向ストリーミング中 / Send Byte=" + (sendSize += readSize);
        }

        await _callBidiStream.RequestStream.CompleteAsync();
    }

    //後始末
    _callBidiStream.Dispose();

    this.BidiUploadButton.IsEnabled = true;
    this.CancelBidiButton.IsEnabled = false;
    this.BidiDownloadButton.IsEnabled = true;

    if (this._bidiCanceled)
    {
        this.BidiMessage.Text = "キャンセルしました";
        this.BidiDownloadMessage.Text = "";
    }
    else
    {
        this.BidiMessage.Text = "ストリーミング完了";
        this.BidiDownloadMessage.Text = responseMessage;
    }
}

/// <summary>
/// 双方向ストリーミングのダウンロードファイル名設定
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void BidiDownloadButton_Click(object sender, RoutedEventArgs e)
{
    SaveFileDialog saveFileDialog = new SaveFileDialog();
    saveFileDialog.FileName = this.BidiDownloadTextBox.Text;

    if (this.BidiDownloadTextBox.Text.Length == 0)
    {
        return;
    }

    if (saveFileDialog.ShowDialog() == false)
        return;

    this.BidiDownloadTextBox.Text = saveFileDialog.FileName;

}

/// <summary>
/// キャンセルした
/// </summary>
private bool _bidiCanceled = false;

/// <summary>
/// 双方向ストリーミング中断
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void CancelBidiButton_Click(object sender, RoutedEventArgs e)
{
    this._bidiCanceled = true;
}

双方向ストリーミングのメイン処理は BidiUploadButton_Click になる。

gRPCメソッドの呼び出し方は単項メソッドとは異なり、呼び出しでは引数を指定しない。


    // gRPC サービスを呼び出す。
    _callBidiStream = this.grpcClient.GreeterClient.BidirectionalStream();

クライアント側のコードは、クライアントストリーミングとサーバーストリーミングの処理を明白に別スレッドでコーディングしている。

サーバー側で両者を互いに独立してコーディングするなら、この書き方を参考にすると良い。

クライアントストリーミングとサーバーストリーミングの処理はどちらも BufferSize のバイト数ごとに、パケットをループで読み込み、送信用又は受信用のパケットを作成する。

先に、サーバーストリーミングの処理を書いている。

    // 非同期レスポンス受信とファイル出力
    string writeFileName = this.BidiDownloadTextBox.Text;
    string responseMessage = "";

    var readTask = Task.Run(async () =>
    {
        FileStream wfs = null;

        try
        {
            await foreach (var message in _callBidiStream.ResponseStream.ReadAllAsync())
            {
                if (this._bidiCanceled)
                {
                    break;
                }

                if (wfs == null)
                {
                    wfs = new FileStream(writeFileName, FileMode.Create, FileAccess.Write);
                }

                byte[] wbin = message.Binary.ToByteArray();

                wfs.Write(wbin, 0, (int)message.BinarySize);
            }
        }
        catch(Exception ex)
        {
            responseMessage = ex.Message;
        }
        finally
        {
            if (wfs != null)
            {
                wfs.Close();
                wfs.Dispose();
            }
        }
    });

Task.Run の中でラムダ式により、サーバーストリーミング処理自体を親スレッドから独立した、子スレッドにしてコーディングしている。

この子スレッドは、gRPCサービスからの受信が終わるまで無限にループし、バックグラウンドで常駐する。

次に、クライアントストリーミング(リクエストストリーム)のコーディングをしている。


    //ファイルアップロードと非同期リクエストストリーム
    using (var fs = new FileStream(filePath, FileMode.Open, FileAccess.Read))
    {
        int sendSize = 0;
        int readSize = 0;

        while ((readSize = fs.Read(bin, 0, BufferSize)) > 0)
        {
            if (this._bidiCanceled || readTask.IsCompleted || readTask.IsCanceled)
            {
                break;
            }

            bidiRequest.Binary = Google.Protobuf.ByteString.CopyFrom(bin);

            await _callBidiStream.RequestStream.WriteAsync(bidiRequest);
            await Task.Delay(TimeSpan.FromMilliseconds(10));

            this.BidiMessage.Text = fileName + " 双方向ストリーミング中 / Send Byte=" + (sendSize += readSize);
        }

        await _callBidiStream.RequestStream.CompleteAsync();
    }

送信ファイルを FileStream で開き、fs.Read(bin, 0, BufferSize) でパケット単位にファイルデータを読み込む。

読み込んだパケットを、以下の処理で Protobuf 仕様に変換し、

bidiRequest.Binary = Google.Protobuf.ByteString.CopyFrom(bin);

以下のように、非同期でサーバーへ送信する。

await _callBidiStream.RequestStream.WriteAsync(bidiRequest);
await Task.Delay(TimeSpan.FromMilliseconds(10));

Task.Delay は例によってキャンセルを実施する為にスリープしているだけで、必須ではない。

クライアントストリーミングは、以下のように処理終了時にサーバーへ終了を通知してやる必要がある。

await _callBidiStream.RequestStream.CompleteAsync();

キャンセル処理は単純に「フラグ」を立てて、ループの中でそのフラグを確認して、立っていたら終了する。

クライアント側で制御できるので、中断処理は単純だ。


/// <summary>
/// キャンセルした
/// </summary>
private bool _bidiCanceled = false;

/// <summary>
/// 双方向ストリーミング中断
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void CancelBidiButton_Click(object sender, RoutedEventArgs e)
{
    this._bidiCanceled = true;
}

サーバーストリーミングのキャンセル処理


            await foreach (var message in _callBidiStream.ResponseStream.ReadAllAsync())
            {
                if (this._bidiCanceled)
                {
                    break;
                }

クライアントストリーミングのキャンセル処理


        while ((readSize = fs.Read(bin, 0, BufferSize)) > 0)
        {
            if (this._bidiCanceled || readTask.IsCompleted || readTask.IsCanceled)
            {
                break;
            }

双方向ストリーミングのサンプルでした

以上で、双方向ストリーミングのサンプルの解説を終わる。

これで、gRPC の主要な処理の解説は、だいたいカバーしたと思う。

スマートクライアントや、アプリで通信部分に WCF や、REST-API などを使用している人は、gRPC への移行を余儀なくされると思う。

そういう人は、これまで解説した gRPC の使い方を参考にして貰うと、余計な時間をかけずに gRPC を導入する事ができると思う。

お役に立てば幸いだ。

尚、非同期プログラミングについて良く分からない人には、以下の電子書籍がお勧めです。

「C#で非同期プログラミングをする方法: Thread,ThreadPoolからTask,async awaitまでを分かりやすく解説 Kindle版」

Kindle Unlimited 対応なので、Unlimited会員なら無料で読めます。

購入しても 250円と格安です。

また、ページ数も少なく、必要最小限の解説しかしていないので、非同期プログラミングを一日で習得できます。

非同期プログラミングは奥が深いですが、双方向ストリーミングに使用する程度なら、この本の解説で十分です。

C#で非同期プログラミングをする方法: Thread,ThreadPoolからTask,async awaitまでを分かりやすく解説 Kindle版

「C#によるgRPC通信のサンプルコード」に戻る

タイトルとURLをコピーしました