Skip to content

Commit 5d0b442

Browse files
committed
remove old file streaming for modern for wshfs.Read() call
1 parent 65da1bd commit 5d0b442

5 files changed

Lines changed: 57 additions & 34 deletions

File tree

frontend/app/view/preview/preview-directory.tsx

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
import { ContextMenuModel } from "@/app/store/contextmenu";
5-
import { useWaveEnv } from "@/app/waveenv/waveenv";
65
import { globalStore } from "@/app/store/jotaiStore";
76
import { TabRpcClient } from "@/app/store/wshrpcutil";
7+
import { useWaveEnv } from "@/app/waveenv/waveenv";
88
import { checkKeyPressed, isCharacterKeyEvent } from "@/util/keyutil";
99
import { PLATFORM, PlatformMacOS } from "@/util/platformutil";
1010
import { addOpenMenuItems } from "@/util/previewutil";
@@ -112,7 +112,6 @@ function DirectoryTable({
112112
newDirectory,
113113
}: DirectoryTableProps) {
114114
const env = useWaveEnv<PreviewEnv>();
115-
const searchActive = useAtomValue(model.directorySearchActive);
116115
const fullConfig = useAtomValue(env.atoms.fullConfigAtom);
117116
const defaultSort = useAtomValue(env.getSettingsKeyAtom("preview:defaultsort")) ?? "name";
118117
const setErrorMsg = useSetAtom(model.errorMsgAtom);
@@ -587,28 +586,26 @@ function DirectoryPreview({ model }: DirectoryPreviewProps) {
587586
useEffect(
588587
() =>
589588
fireAndForget(async () => {
590-
let entries: FileInfo[];
589+
const entries: FileInfo[] = [];
591590
try {
592-
const file = await env.rpc.FileReadCommand(
593-
TabRpcClient,
594-
{
595-
info: {
596-
path: await model.formatRemoteUri(dirPath, globalStore.get),
597-
},
598-
},
599-
null
600-
);
601-
entries = file.entries ?? [];
602-
if (file?.info && file.info.dir && file.info?.path !== file.info?.dir) {
591+
const remotePath = await model.formatRemoteUri(dirPath, globalStore.get);
592+
const stream = env.rpc.FileListStreamCommand(TabRpcClient, { path: remotePath }, null);
593+
for await (const chunk of stream) {
594+
if (chunk?.fileinfo) {
595+
entries.push(...chunk.fileinfo);
596+
}
597+
}
598+
if (finfo?.dir && finfo?.path !== finfo?.dir) {
603599
entries.unshift({
604600
name: "..",
605-
path: file?.info?.dir,
601+
path: finfo.dir,
606602
isdir: true,
607603
modtime: new Date().getTime(),
608604
mimetype: "directory",
609605
});
610606
}
611607
} catch (e) {
608+
console.error("Directory Read Error", e);
612609
setErrorMsg({
613610
status: "Cannot Read Directory",
614611
text: `${e}`,

frontend/app/view/preview/previewenv.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export type PreviewEnv = WaveEnvSubset<{
1111
ConnEnsureCommand: WaveEnv["rpc"]["ConnEnsureCommand"];
1212
FileInfoCommand: WaveEnv["rpc"]["FileInfoCommand"];
1313
FileReadCommand: WaveEnv["rpc"]["FileReadCommand"];
14+
FileListStreamCommand: WaveEnv["rpc"]["FileListStreamCommand"];
1415
FileWriteCommand: WaveEnv["rpc"]["FileWriteCommand"];
1516
FileMoveCommand: WaveEnv["rpc"]["FileMoveCommand"];
1617
FileDeleteCommand: WaveEnv["rpc"]["FileDeleteCommand"];

pkg/remote/fileshare/fsutil/fsutil.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,3 @@ func ReadStreamToFileData(ctx context.Context, readCh <-chan wshrpc.RespOrErrorU
150150
}
151151
return fileData, nil
152152
}
153-
154-
func ReadFileStreamToWriter(ctx context.Context, readCh <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData], writer io.Writer) error {
155-
return ReadFileStream(ctx, readCh, func(finfo wshrpc.FileInfo) {
156-
}, func(entries []*wshrpc.FileInfo) error {
157-
return nil
158-
}, func(data io.Reader) error {
159-
_, err := io.Copy(writer, data)
160-
return err
161-
})
162-
}

pkg/remote/fileshare/wshfs/wshfs.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2025, Command Line Inc.
1+
// Copyright 2026, Command Line Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

44
package wshfs
@@ -7,12 +7,12 @@ import (
77
"context"
88
"encoding/base64"
99
"fmt"
10+
"io"
1011
"log"
1112
"os"
1213
"time"
1314

1415
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
15-
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil"
1616
"github.com/wavetermdev/waveterm/pkg/wshrpc"
1717
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
1818
"github.com/wavetermdev/waveterm/pkg/wshutil"
@@ -41,22 +41,54 @@ func parseConnection(ctx context.Context, path string) (*connparse.Connection, e
4141
}
4242

4343
func Read(ctx context.Context, data wshrpc.FileData) (*wshrpc.FileData, error) {
44+
if data.Info == nil {
45+
return nil, fmt.Errorf("file info is required")
46+
}
4447
log.Printf("Read: %v", data.Info.Path)
4548
conn, err := parseConnection(ctx, data.Info.Path)
4649
if err != nil {
4750
return nil, err
4851
}
49-
rtnCh := readStream(conn, data)
50-
return fsutil.ReadStreamToFileData(ctx, rtnCh)
51-
}
52-
53-
func readStream(conn *connparse.Connection, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
52+
broker := RpcClient.StreamBroker
53+
if broker == nil {
54+
return nil, fmt.Errorf("stream broker not available")
55+
}
56+
if RpcClientRouteId == "" {
57+
return nil, fmt.Errorf("no route id available")
58+
}
59+
readerRouteId := RpcClientRouteId
60+
writerRouteId := wshutil.MakeConnectionRouteId(conn.Host)
61+
reader, streamMeta := broker.CreateStreamReader(readerRouteId, writerRouteId, 256*1024)
62+
defer reader.Close()
63+
go func() {
64+
<-ctx.Done()
65+
reader.Close()
66+
}()
5467
byteRange := ""
5568
if data.At != nil && data.At.Size > 0 {
5669
byteRange = fmt.Sprintf("%d-%d", data.At.Offset, data.At.Offset+int64(data.At.Size)-1)
5770
}
58-
streamFileData := wshrpc.CommandRemoteStreamFileData{Path: conn.Path, ByteRange: byteRange}
59-
return wshclient.RemoteStreamFileCommand(RpcClient, streamFileData, &wshrpc.RpcOpts{Route: wshutil.MakeConnectionRouteId(conn.Host)})
71+
remoteData := wshrpc.CommandRemoteFileStreamData{
72+
Path: conn.Path,
73+
ByteRange: byteRange,
74+
StreamMeta: *streamMeta,
75+
}
76+
fileInfo, err := wshclient.RemoteFileStreamCommand(RpcClient, remoteData, &wshrpc.RpcOpts{Route: writerRouteId})
77+
if err != nil {
78+
return nil, fmt.Errorf("starting remote file stream: %w", err)
79+
}
80+
var rawData []byte
81+
if fileInfo != nil && !fileInfo.IsDir {
82+
rawData, err = io.ReadAll(reader)
83+
if err != nil {
84+
return nil, fmt.Errorf("reading file stream: %w", err)
85+
}
86+
}
87+
rtnData := &wshrpc.FileData{Info: fileInfo}
88+
if len(rawData) > 0 {
89+
rtnData.Data64 = base64.StdEncoding.EncodeToString(rawData)
90+
}
91+
return rtnData, nil
6092
}
6193

6294
func GetConnectionRouteId(ctx context.Context, path string) (string, error) {

pkg/wshrpc/wshremote/wshremote_file.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,9 @@ func (impl *ServerImpl) RemoteListEntriesCommand(ctx context.Context, data wshrp
357357
ch <- wshutil.RespErr[wshrpc.CommandRemoteListEntriesRtnData](err)
358358
return
359359
}
360+
if data.Opts == nil {
361+
data.Opts = &wshrpc.FileListOpts{}
362+
}
360363
innerFilesEntries := []os.DirEntry{}
361364
seen := 0
362365
if data.Opts.Limit == 0 {

0 commit comments

Comments
 (0)