Elasticsearch 系列 - 使用 C# / Elasticsearch.Net 來新增文件記錄到 Elasticsearch 資料庫
之前寫了一篇關於 Elasticsearch 系列 - 使用 C# 來新增文件記錄到 Elasticsearch 資料庫 文章,原本是想要撰寫一些關於使用 .NET client for Elasticsearch (v8 .NET Client) 來進行 Elasticsearch 的相關 CRUD 新增、查詢、修改、刪除操作;不過,由於在官方網站與網路資源上,可以查看到的參考文件與說明內容,真的少得可憐,迫於事實情況,我決定還是要採用 Elasticsearch v7 的 .NET Client 則是通稱為 (NEST) client 來寫出相關文件的 CRUD 新增、查詢、修改、刪除程式設計作法。
建立測試專案
請依照底下的操作,建立起這篇文章需要用到的練習專案
- 打開 Visual Studio 2022 IDE 應用程式
- 從 [Visual Studio 2022] 對話窗中,點選右下方的 [建立新的專案] 按鈕
- 在 [建立新專案] 對話窗右半部
- 切換 [所有語言 (L)] 下拉選單控制項為 [C#]
- 切換 [所有專案類型 (T)] 下拉選單控制項為 [主控台]
- 在中間的專案範本清單中,找到並且點選 [主控台應用程式] 專案範本選項
專案,用於建立可在 Windows、Linux 及 macOS 於 .NET 執行的命令列應用程式
- 點選右下角的 [下一步] 按鈕
- 在 [設定新的專案] 對話窗
- 找到 [專案名稱] 欄位,輸入
csElasticsearchNestCreate
作為專案名稱 - 在剛剛輸入的 [專案名稱] 欄位下方,確認沒有勾選 [將解決方案與專案至於相同目錄中] 這個檢查盒控制項
- 點選右下角的 [下一步] 按鈕
- 現在將會看到 [其他資訊] 對話窗
- 在 [架構] 欄位中,請選擇最新的開發框架,這裡選擇的 [架構] 是 :
.NET 8.0 (標準字詞支援)
- 在這個練習中,需要去勾選 [不要使用最上層陳述式(T)] 這個檢查盒控制項
這裡的這個操作,可以由讀者自行決定是否要勾選這個檢查盒控制項
- 請點選右下角的 [建立] 按鈕
稍微等候一下,這個主控台專案將會建立完成
安裝要用到的 NuGet 開發套件
因為開發此專案時會用到這些 NuGet 套件,請依照底下說明,將需要用到的 NuGet 套件安裝起來。
安裝 Elasticsearch.Net 套件
這個 Elasticsearch.Net 套件是 Elasticsearch 的低階 .NET 客戶端,與 NEST 不同,它提供了更多的彈性和直接控制,但也意味著需要手動處理較多的細節。
請依照底下說明操作步驟,將這個套件安裝到專案內
- 滑鼠右擊 [方案總管] 視窗內的 [專案節點] 下方的 [相依性] 節點
- 從彈出功能表清單中,點選 [管理 NuGet 套件] 這個功能選項清單
- 此時,將會看到 [NuGet: csElasticsearchNestCreate] 視窗
- 切換此視窗的標籤頁次到名稱為 [瀏覽] 這個標籤頁次
- 在左上方找到一個搜尋文字輸入盒,在此輸入
Elasticsearch.Net
- 在視窗右方,將會看到該套件詳細說明的內容,其中,右上方有的 [安裝] 按鈕
- 點選這個 [安裝] 按鈕,將這個套件安裝到專案內
安裝 NEST 套件
這個 NEST 套件是 Elasticsearch 的官方 .NET 高階客戶端。它是一個強大的、易於使用的 .NET 庫,旨在與 Elasticsearch 交互。NEST 提供了一個豐富的 .NET 接口,使得在 .NET 應用中與 Elasticsearch 進行通信變得容易和直觀。
請依照底下說明操作步驟,將這個套件安裝到專案內
- 滑鼠右擊 [方案總管] 視窗內的 [專案節點] 下方的 [相依性] 節點
- 從彈出功能表清單中,點選 [管理 NuGet 套件] 這個功能選項清單
- 此時,將會看到 [NuGet: csElasticsearchNestCreate] 視窗
- 切換此視窗的標籤頁次到名稱為 [瀏覽] 這個標籤頁次
- 在左上方找到一個搜尋文字輸入盒,在此輸入
NEST
- 在視窗右方,將會看到該套件詳細說明的內容,其中,右上方有的 [安裝] 按鈕
- 點選這個 [安裝] 按鈕,將這個套件安裝到專案內
建立要使用的程式碼
- 在 [方案總管] 內找到並且開啟 [Program.cs] 檔案這個節點
- 使用底下 C# 程式碼,將原本的程式碼取代掉
using Nest;
using System.Diagnostics;
namespace csElasticsearchNestCreate;
[ElasticsearchType(IdProperty = nameof(BlogId))]
public class Blog
{
public int BlogId { get; set; }
public string Title { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public DateTime CreateAt { get; set; } = DateTime.Now;
public DateTime UpdateAt { get; set; } = DateTime.Now;
}
internal class Program
{
static async Task Main(string[] args)
{
var settings = new ConnectionSettings(new Uri("http://10.1.1.231:9200/"))
.DisableDirectStreaming()
.BasicAuthentication("elastic", "elastic");
var client = new ElasticClient(settings);
string indexName = "blogs".ToLower();
// 嘗試讓 client 物件與後端 Elasticsearch 來通訊,避免第一次的延遲
await client.Indices.DeleteAsync(indexName);
// 建立 index
await client.IndexAsync<Blog>(new Blog()
{
BlogId = 999,
Title = $"Nice to meet your 999",
Content = $"Hello Elasticsearch 999",
CreateAt = DateTime.Now.AddDays(999),
UpdateAt = DateTime.Now.AddDays(999),
}, idx=>idx.Index(indexName));
Stopwatch stopwatch = new Stopwatch();
#region 每次新增一筆文件,共 100 次
stopwatch.Restart();
for (int i = 0; i < 100; i++)
{
Blog blog = new Blog()
{
BlogId = i,
Title = $"Nice to meet your {i}",
Content = $"Hello Elasticsearch {i}",
CreateAt = DateTime.Now.AddDays(i),
UpdateAt = DateTime.Now.AddDays(i),
};
var response = await client.IndexAsync(blog,
idx => idx.Index(indexName));
if (response.IsValid)
{
//Console.WriteLine($"Index document with ID {response.Id} succeeded.");
}
else
{
Console.WriteLine($"Error Message : {response.DebugInformation}");
}
}
stopwatch.Stop();
// 顯示需要耗費時間
Console.WriteLine($"新增 100 次文件需要 {stopwatch.ElapsedMilliseconds} ms");
#endregion
#region 一次新增 100 筆文件
stopwatch.Restart();
Console.WriteLine();
List<Blog> list = new List<Blog>();
for (int i = 0; i < 100; i++)
{
int foo = i + 10000;
Blog blog = new Blog()
{
BlogId = foo,
Title = $"Nice to meet your (Bulk) {foo}",
Content = $"Hello Elasticsearch (Bulk) {foo}",
CreateAt = DateTime.Now.AddDays(i),
UpdateAt = DateTime.Now.AddDays(i),
};
list.Add(blog);
}
var response2 = await client
.BulkAsync(b => b.Index(indexName).IndexMany(list));
if (response2.IsValid)
{
//Console.WriteLine($"Index document with ID {response.Id} succeeded.");
}
else
{
//Console.WriteLine($"Error Message : {response2.DebugInformation}");
}
stopwatch.Stop();
// 顯示需要耗費時間
Console.WriteLine($"新增 100 次文件需要 {stopwatch.ElapsedMilliseconds} ms");
#endregion
}
}
對於這個 Index 的模型,將會定義在 [Blog] 這個類別內,由於 NEST 預設將會使用 [Id] 這樣的欄位作為唯一識別碼,不過,在這個 [Blog] 類別內,卻沒有這樣的屬性存在,而是有個 [BlogId] 這樣的屬性,所以,我們需要在這個類別內使用 ElasticsearchType
這個屬性來指定這個欄位為唯一識別碼。
在此,將會在該類別之外,使用 [ElasticsearchType(IdProperty = nameof(BlogId))]
這樣的屬性來指定這個欄位為唯一識別碼。
對於這個 [Blog] 類別內的屬性,將會定義如下
- [BlogId] : 這個屬性將會作為唯一識別碼
- [Title] : 這個屬性將會作為標題
- [Content] : 這個屬性將會作為內容
- [CreateAt] : 這個屬性將會作為建立時間
- [UpdateAt] : 這個屬性將會作為更新時間
接下來就是這個程式進入點內的程式碼,首先,將會建立一個 [ConnectionSettings] 物件,用來宣告與 Elasticsearch 進行通訊的相關設定,這個物件將會傳入一個 Uri 物件,這個 Uri 物件將會指定 Elasticsearch 的伺服器位址。
由於這台 Elasticsearch 的伺服器,有設定帳號與密碼,所以,這裡將會使用 BasicAuthentication
這個方法,來指定帳號與密碼。
一旦得到 [ConnectionSettings] 物件,接下來的工作將會是要建立一個 [ElasticClient] 這個類別,這個類別將會是與 Elasticsearch 進行通訊的主要類別,這個類別的建構式,將會需要傳入一個 [ConnectionSettings] 類別的物件,這個物件將會是用來設定與 Elasticsearch 進行通訊的相關設定。
接下來,因為底下的許多操作都需要指定在 Elasticsearch 內的索引名稱,因此,將會建立一個字串變數 [indexName],這個變數將會是用來指定要操作的 Index 名稱,這裡將會指定為 blogs
。這裡有使用 ToLower()
方法,將這個字串轉換為小寫字串,這是因為在 Elasticsearch 內,對於所有的 Index 名稱,都是使用小寫字母來表示。
由於這是一個練習專案,用來學習如何透過 C# 與 NEST 來操作 Elasticsearch,因此,這裡將會使用 await client.Indices.DeleteAsync(indexName);
這個方法,來刪除這個 Index,這樣的操作,將會讓這個專案可以重複執行,而不會因為 Index 已經存在而產生錯誤;另外,也可以透過 [Kibana] 來觀察所新增的紀錄是否成功的產生在 Elasticsearch 資料庫內。
在這個專案內,將會展示兩種新增文件的做法,這兩種作法將會新增 100 筆文件到 Elasticsearch 資料庫內,第一種作法是每次新增一筆文件,共 100 次,也就是說,需要呼叫 Elasticsearch API 100 次;第二種作法是一次新增 100 筆文件,也就是說,先將這 100 份文件儲存到 .NET 集合 Collection 物件內,再呼叫一次 Elasticsearch API,將這 100 份文件一次新增到 Elasticsearch 內。
在這個專案內,將會使用 Stopwatch
這個類別,來計算這兩種作法所需要的時間,而為了避免量測上的誤差,因此,將會在執行每一種作法之前,先呼叫 Stopwatch.Restart()
這個方法,來重置計時器,接著,再呼叫 Stopwatch.Stop()
這個方法,來停止計時器,最後,再呼叫 Stopwatch.ElapsedMilliseconds
這個屬性,來取得所耗費的時間。
另外,會在程式第一次啟動後,並且刪除了這個索引之後,使用 await client.IndexAsync<Blog>
這個方法,來新增一筆文件到 Elasticsearch 內,這樣的作法,將會讓 Elasticsearch 內的索引,可以自動建立起來,這樣的作法,將會讓這個專案可以重複執行,而不會因為 Index 已經存在而產生錯誤。並且因為這個 [ElasticClient] 物件已經與 Elasticsearch 伺服器通訊過了,省卻其他第一次初始化需要的工作。
底下將會為故意新增一筆文件到 Elasticsearch 資料庫內的程式碼
await client.IndexAsync<Blog>(new Blog()
{
BlogId = 999,
Title = $"Nice to meet your 999",
Content = $"Hello Elasticsearch 999",
CreateAt = DateTime.Now.AddDays(999),
UpdateAt = DateTime.Now.AddDays(999),
}, idx=>idx.Index(indexName));
接下來要做到 [每次新增一筆文件,共 100 次] 的程式碼設計,在這裡將會建立一個執行 100 次的迴圈,在這個迴圈內,每次會建立一個 [Blog] 物件,在這個物件內的屬性值,每次都會不相同,尤其要確保 BlogId 這個屬性值不能夠重複,因為這是這個索引的主鍵唯一值。
一旦生成成了這個 [Blog] 類別物件,就可以呼叫 var response = await client.IndexAsync(blog, idx => idx.Index(indexName));
這個敘述,將這個 .NET 物件新增到 Elasticsearch 資料庫內的指定索引內,形成該索引內的一個文件。這個方法將會回傳一個型別為 [IndexResponse] 的物件,透過這個物件,將可以得到此次新增的動作是否有成功,這裡是使用這個物件內的 IsValid 這個屬性來判斷出來。
這個方法將會使用這個 URL PUT: /blogs/_doc/0
,透過 HTTP PUT 方法,呼叫遠端的 Elasticsearch 資料庫,並且在此次呼叫中,使用這個 Payload {"blogId":0,"title":"Nice to meet your 0","content":"Hello Elasticsearch 0","createAt":"2023-12-11T10:11:27.0857254+08:00","updateAt":"2023-12-11T10:11:27.0857261+08:00"}
告知 Elasticsearch 伺服器,這是要新增到 [blogs] 這個索引內的文件。
底下將會是呼叫完成這個方法之後,得到的執行結果內容,這裡將會得到一個 JSON 物件
{
"_index":"blogs",
"_id":"0",
"_version":1,
"result":"created",
"_shards":{
"total":2,
"successful":2,
"failed":0
},
"_seq_no":0,
"_primary_term":1
}
這個 JSON 表示在 Elasticsearch 索引「blogs」中成功創建了一個 ID 為「0」的文檔。
- 「_index」 是索引的名稱。索引是 Elasticsearch 中用於存儲文檔的容器。
- 「_id」 是文檔的唯一識別符。它由 Elasticsearch 生成,並且通常是整數或字符串。
- 「_version」 表示文檔的當前版本。新創建的文檔版本為 1,每次更新文檔時版本號會遞增。這有助於 Elasticsearch 跟踪文檔的更改。
- 「result」 是文檔創建操作的結果。在本例中,結果為「created」,表示操作成功。
- 「_shards」 提供有關 Elasticsearch 分片架構的信息。分片是用於提高 Elasticsearch 可擴展性和可用性的技術。
- 「_seq_no」 是文檔在其分片中的序列號。它用於跟踪文檔的更改。
- 「_primary_term」 指示文檔所在的主分片的術語。術語是用於跟踪文檔歷史記錄的技術。
總而言之,這個 JSON 表示文檔已成功索引並可供搜索和檢索。
當完成呼叫 100 次的新增 API 呼叫之後,將會透過 Console.WriteLine($"新增 100 次文件需要 {stopwatch.ElapsedMilliseconds} ms");
敘述,得到這次動作總共耗時多久。
接下來將會需要設計 [一次新增 100 筆文件] 這樣的需求
這裡首先會宣告一個集合物件 List<Blog> list = new List<Blog>();
,接著,設計一個迴圈來執行 100 次動作,每次將會建立一個 [Blog] 物件,並且儲存到 [list] 集合物件內。一旦,建立完成這個集合物件內的 100 個 [Blog] 物件之後,便可以呼叫 var response2 = await client.BulkAsync(b => b.Index(indexName).IndexMany(list));
這樣敘述,使用 [BulkAsync] 方法來一次新增到索引內。
在這裡將會使用這個 URL POST: /blogs/_bulk
來呼叫 Elasticsearch API ,並且使用底下的 Payload 來將這 100 筆紀錄傳送過去
{
"index":{
"_id":"10000"
}
}{
"blogId":10000,
"title":"Nice to meet your (Bulk) 10000",
"content":"Hello Elasticsearch (Bulk) 10000",
"createAt":"2023-12-11T11:24:49.0384907+08:00",
"updateAt":"2023-12-11T11:24:49.0384909+08:00"
}{
"index":{
"_id":"10001"
}
}{
"blogId":10001,
"title":"Nice to meet your (Bulk) 10001",
"content":"Hello Elasticsearch (Bulk) 10001",
"createAt":"2023-12-12T11:24:49.0384924+08:00",
"updateAt":"2023-12-12T11:24:49.0384924+08:00"
}
...
{
"index":{
"_id":"10099"
}
}{
"blogId":10099,
"title":"Nice to meet your (Bulk) 10099",
"content":"Hello Elasticsearch (Bulk) 10099",
"createAt":"2024-03-19T11:24:49.0385283+08:00",
"updateAt":"2024-03-19T11:24:49.0385284+08:00"
}
當這個方法執行完成之後,Elasticsearch API 將會回傳底下的 JSON 內容
{
"errors":false,
"took":6,
"items":[
{
"index":{
"_index":"blogs",
"_id":"10000",
"_version":1,
"result":"created",
"_shards":{
"total":2,
"successful":2,
"failed":0
},
"_seq_no":36,
"_primary_term":1,
"status":201
}
},
{
"index":{
"_index":"blogs",
"_id":"10001",
"_version":1,
"result":"created",
"_shards":{
"total":2,
"successful":2,
"failed":0
},
"_seq_no":31,
"_primary_term":1,
"status":201
}
},
{
"index":{
"_index":"blogs",
"_id":"10002",
"_version":1,
"result":"created",
"_shards":{
"total":2,
"successful":2,
"failed":0
},
"_seq_no":32,
"_primary_term":1,
"status":201
}
},
...
{
"index":{
"_index":"blogs",
"_id":"10099",
"_version":1,
"result":"created",
"_shards":{
"total":2,
"successful":2,
"failed":0
},
"_seq_no":67,
"_primary_term":1,
"status":201
}
}
]
}
這個 JSON 表示 Elasticsearch 成功地將 100 個文件索引到「blogs」索引中。 每個文件都有唯一的 ID,從 10000 到 10099。
以下是 JSON 中各個字段的含義:
- _index: 索引的名稱,在本例中是「blogs」。
- _id: 文檔的唯一標識符。
- _version: 文檔的當前版本,新創建的文檔版本為 1,每次更新文件時版本號會遞增。
- result: 文檔創建操作的結果,在本例中,結果為「created」,表示操作成功。
- _shards: 提供有關 Elasticsearch 分片架構的資訊。 在本例中,每個文檔都複製到兩個分片,並且沒有失敗。
- _seq_no: 文檔在其分片中的序列號。
- _primary_term: 指示文檔所在的主分片的術語。
- status: 文檔創建操作的狀態碼,在本例中,狀態碼為 201,表示操作成功。
總而言之,這個 JSON 表明所有文檔都已成功索引並可供搜索和檢索。
而在 Elasticsearch 中,"took" 欄位表示執行操作所花費的時間,以毫秒為單位。 在本例中,"took":6 表示將 100 個檔索引到「blogs」索引所花費的時間為 6 毫秒。
"took" 欄位可用於監控 Elasticsearch 的性能。 如果 「took」 值很高,則可能表明 Elasticsearch 正在處理大量數據或存在性能瓶頸。
在本例中,"took" 值為 6 毫秒,表明 Elasticsearch 能夠以非常快的速度處理大量數據。
執行程式碼
- 按下
F5
鍵,開始執行這個程式 - 請觀察 Console 視窗內的內容
- 對於 [每次新增一筆文件,共 100 次] 這樣動作,將會耗時 3284 ms
- 對於 [一次新增 100 筆文件] 這樣動作,將會耗時 120 ms
新增 100 次文件需要 3284 ms
新增 100 次文件需要 120 ms
沒有留言:
張貼留言