Skip to content

Commit 09b4f7c

Browse files
committed
remove old style file streaming call
1 parent 5d0b442 commit 09b4f7c

5 files changed

Lines changed: 0 additions & 166 deletions

File tree

frontend/app/store/wshclientapi.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -774,12 +774,6 @@ export class RpcApiType {
774774
return client.wshRpcStream("remotestreamcpudata", null, opts);
775775
}
776776

777-
// command "remotestreamfile" [responsestream]
778-
RemoteStreamFileCommand(client: WshClient, data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator<FileData, void, boolean> {
779-
if (this.mockClient) return this.mockClient.mockWshRpcStream(client, "remotestreamfile", data, opts);
780-
return client.wshRpcStream("remotestreamfile", data, opts);
781-
}
782-
783777
// command "remoteterminatejobmanager" [call]
784778
RemoteTerminateJobManagerCommand(client: WshClient, data: CommandRemoteTerminateJobManagerData, opts?: RpcOpts): Promise<void> {
785779
if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "remoteterminatejobmanager", data, opts);

frontend/types/gotypes.d.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -576,12 +576,6 @@ declare global {
576576
publickeybase64: string;
577577
};
578578

579-
// wshrpc.CommandRemoteStreamFileData
580-
type CommandRemoteStreamFileData = {
581-
path: string;
582-
byterange?: string;
583-
};
584-
585579
// wshrpc.CommandRemoteTerminateJobManagerData
586580
type CommandRemoteTerminateJobManagerData = {
587581
jobid: string;

pkg/wshrpc/wshclient/wshclient.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -770,11 +770,6 @@ func RemoteStreamCpuDataCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) chan ws
770770
return sendRpcRequestResponseStreamHelper[wshrpc.TimeSeriesData](w, "remotestreamcpudata", nil, opts)
771771
}
772772

773-
// command "remotestreamfile", wshserver.RemoteStreamFileCommand
774-
func RemoteStreamFileCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteStreamFileData, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
775-
return sendRpcRequestResponseStreamHelper[wshrpc.FileData](w, "remotestreamfile", data, opts)
776-
}
777-
778773
// command "remoteterminatejobmanager", wshserver.RemoteTerminateJobManagerCommand
779774
func RemoteTerminateJobManagerCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteTerminateJobManagerData, opts *wshrpc.RpcOpts) error {
780775
_, err := sendRpcRequestCallHelper[any](w, "remoteterminatejobmanager", data, opts)

pkg/wshrpc/wshremote/wshremote_file.go

Lines changed: 0 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -31,152 +31,6 @@ const RemoteFileTransferSizeLimit = 32 * 1024 * 1024
3131

3232
var DisableRecursiveFileOpts = true
3333

34-
func (impl *ServerImpl) remoteStreamFileDir(ctx context.Context, path string, byteRange fileutil.ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error {
35-
innerFilesEntries, err := os.ReadDir(path)
36-
if err != nil {
37-
return fmt.Errorf("cannot open dir %q: %w", path, err)
38-
}
39-
if byteRange.All {
40-
if len(innerFilesEntries) > wshrpc.MaxDirSize {
41-
innerFilesEntries = innerFilesEntries[:wshrpc.MaxDirSize]
42-
}
43-
} else {
44-
if byteRange.Start < int64(len(innerFilesEntries)) {
45-
var realEnd int64
46-
if byteRange.OpenEnd {
47-
realEnd = int64(len(innerFilesEntries))
48-
} else {
49-
realEnd = byteRange.End + 1
50-
if realEnd > int64(len(innerFilesEntries)) {
51-
realEnd = int64(len(innerFilesEntries))
52-
}
53-
}
54-
innerFilesEntries = innerFilesEntries[byteRange.Start:realEnd]
55-
} else {
56-
innerFilesEntries = []os.DirEntry{}
57-
}
58-
}
59-
var fileInfoArr []*wshrpc.FileInfo
60-
for _, innerFileEntry := range innerFilesEntries {
61-
if ctx.Err() != nil {
62-
return ctx.Err()
63-
}
64-
innerFileInfoInt, err := innerFileEntry.Info()
65-
if err != nil {
66-
continue
67-
}
68-
innerFileInfo := statToFileInfo(filepath.Join(path, innerFileInfoInt.Name()), innerFileInfoInt, false)
69-
fileInfoArr = append(fileInfoArr, innerFileInfo)
70-
if len(fileInfoArr) >= wshrpc.DirChunkSize {
71-
dataCallback(fileInfoArr, nil, byteRange)
72-
fileInfoArr = nil
73-
}
74-
}
75-
if len(fileInfoArr) > 0 {
76-
dataCallback(fileInfoArr, nil, byteRange)
77-
}
78-
return nil
79-
}
80-
81-
func (impl *ServerImpl) remoteStreamFileRegular(ctx context.Context, path string, byteRange fileutil.ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error {
82-
fd, err := os.Open(path)
83-
if err != nil {
84-
return fmt.Errorf("cannot open file %q: %w", path, err)
85-
}
86-
defer utilfn.GracefulClose(fd, "remoteStreamFileRegular", path)
87-
var filePos int64
88-
if !byteRange.All && byteRange.Start > 0 {
89-
_, err := fd.Seek(byteRange.Start, io.SeekStart)
90-
if err != nil {
91-
return fmt.Errorf("seeking file %q: %w", path, err)
92-
}
93-
filePos = byteRange.Start
94-
}
95-
buf := make([]byte, wshrpc.FileChunkSize)
96-
for {
97-
if ctx.Err() != nil {
98-
return ctx.Err()
99-
}
100-
n, err := fd.Read(buf)
101-
if n > 0 {
102-
if !byteRange.All && !byteRange.OpenEnd && filePos+int64(n) > byteRange.End+1 {
103-
n = int(byteRange.End + 1 - filePos)
104-
}
105-
filePos += int64(n)
106-
dataCallback(nil, buf[:n], byteRange)
107-
}
108-
if !byteRange.All && !byteRange.OpenEnd && filePos >= byteRange.End+1 {
109-
break
110-
}
111-
if errors.Is(err, io.EOF) {
112-
break
113-
}
114-
if err != nil {
115-
return fmt.Errorf("reading file %q: %w", path, err)
116-
}
117-
}
118-
return nil
119-
}
120-
121-
func (impl *ServerImpl) remoteStreamFileInternal(ctx context.Context, data wshrpc.CommandRemoteStreamFileData, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error {
122-
byteRange, err := fileutil.ParseByteRange(data.ByteRange)
123-
if err != nil {
124-
return err
125-
}
126-
path, err := wavebase.ExpandHomeDir(data.Path)
127-
if err != nil {
128-
return err
129-
}
130-
finfo, err := impl.fileInfoInternal(path, true)
131-
if err != nil {
132-
return fmt.Errorf("cannot stat file %q: %w", path, err)
133-
}
134-
dataCallback([]*wshrpc.FileInfo{finfo}, nil, byteRange)
135-
if finfo.NotFound {
136-
return nil
137-
}
138-
if finfo.IsDir {
139-
return impl.remoteStreamFileDir(ctx, path, byteRange, dataCallback)
140-
} else {
141-
if finfo.Size > RemoteFileTransferSizeLimit {
142-
return fmt.Errorf("file %q size %d exceeds transfer limit of %d bytes", path, finfo.Size, RemoteFileTransferSizeLimit)
143-
}
144-
return impl.remoteStreamFileRegular(ctx, path, byteRange, dataCallback)
145-
}
146-
}
147-
148-
func (impl *ServerImpl) RemoteStreamFileCommand(ctx context.Context, data wshrpc.CommandRemoteStreamFileData) chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
149-
ch := make(chan wshrpc.RespOrErrorUnion[wshrpc.FileData], 16)
150-
go func() {
151-
defer func() {
152-
panichandler.PanicHandler("RemoteStreamFileCommand", recover())
153-
}()
154-
defer close(ch)
155-
firstPk := true
156-
err := impl.remoteStreamFileInternal(ctx, data, func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType) {
157-
resp := wshrpc.FileData{}
158-
fileInfoLen := len(fileInfo)
159-
if fileInfoLen > 1 || !firstPk {
160-
resp.Entries = fileInfo
161-
} else if fileInfoLen == 1 {
162-
resp.Info = fileInfo[0]
163-
}
164-
if firstPk {
165-
firstPk = false
166-
}
167-
if len(data) > 0 {
168-
resp.Data64 = base64.StdEncoding.EncodeToString(data)
169-
resp.At = &wshrpc.FileDataAt{Offset: byteRange.Start, Size: len(data)}
170-
}
171-
ch <- wshrpc.RespOrErrorUnion[wshrpc.FileData]{Response: resp}
172-
})
173-
if err != nil {
174-
ch <- wshutil.RespErr[wshrpc.FileData](err)
175-
}
176-
}()
177-
return ch
178-
}
179-
18034
// prepareDestForCopy resolves the final destination path and handles overwrite logic.
18135
// destPath is the raw destination path (may be a directory or file path).
18236
// srcBaseName is the basename of the source file (used when dest is a directory or ends with slash).

pkg/wshrpc/wshrpctypes_file.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ type WshRpcFileInterface interface {
2929
}
3030

3131
type WshRpcRemoteFileInterface interface {
32-
// old streaming inferface
33-
RemoteStreamFileCommand(ctx context.Context, data CommandRemoteStreamFileData) chan RespOrErrorUnion[FileData]
34-
3532
// modern streaming interface
3633
RemoteFileStreamCommand(ctx context.Context, data CommandRemoteFileStreamData) (*FileInfo, error)
3734

0 commit comments

Comments
 (0)