Merge branch '21128-toolbar-context-menu'
[arvados-workbench2.git] / src / store / process-logs-panel / process-logs-panel-actions.ts
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 import { unionize, ofType, UnionOf } from "common/unionize";
6 import { ProcessLogs } from './process-logs-panel';
7 import { LogEventType } from 'models/log';
8 import { RootState } from 'store/store';
9 import { ServiceRepository } from 'services/services';
10 import { Dispatch } from 'redux';
11 import { LogFragment, LogService, logFileToLogType } from 'services/log-service/log-service';
12 import { Process, getProcess } from 'store/processes/process';
13 import { navigateTo } from 'store/navigation/navigation-action';
14 import { snackbarActions, SnackbarKind } from 'store/snackbar/snackbar-actions';
15 import { CollectionFile, CollectionFileType } from "models/collection-file";
16 import { ContainerRequestResource, ContainerRequestState } from "models/container-request";
17
18 const SNIPLINE = `================ ✀ ================ ✀ ========= Some log(s) were skipped ========= ✀ ================ ✀ ================`;
19 const LOG_TIMESTAMP_PATTERN = /^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{9}Z/;
20
21 export const processLogsPanelActions = unionize({
22     RESET_PROCESS_LOGS_PANEL: ofType<{}>(),
23     INIT_PROCESS_LOGS_PANEL: ofType<{ filters: string[], logs: ProcessLogs }>(),
24     SET_PROCESS_LOGS_PANEL_FILTER: ofType<string>(),
25     ADD_PROCESS_LOGS_PANEL_ITEM: ofType<ProcessLogs>(),
26 });
27
28 // Max size of logs to fetch in bytes
29 const maxLogFetchSize: number = 128 * 1000;
30
31 type FileWithProgress = {
32     file: CollectionFile;
33     lastByte: number;
34 }
35
36 type SortableLine = {
37     logType: LogEventType,
38     timestamp: string;
39     contents: string;
40 }
41
42 export type ProcessLogsPanelAction = UnionOf<typeof processLogsPanelActions>;
43
44 export const setProcessLogsPanelFilter = (filter: string) =>
45     processLogsPanelActions.SET_PROCESS_LOGS_PANEL_FILTER(filter);
46
47 export const initProcessLogsPanel = (processUuid: string) =>
48     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
49         let process: Process | undefined;
50         try {
51             dispatch(processLogsPanelActions.RESET_PROCESS_LOGS_PANEL());
52             process = getProcess(processUuid)(getState().resources);
53             if (process?.containerRequest?.uuid) {
54                 // Get log file size info
55                 const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
56
57                 // Populate lastbyte 0 for each file
58                 const filesWithProgress = logFiles.map((file) => ({ file, lastByte: 0 }));
59
60                 // Fetch array of LogFragments
61                 const logLines = await loadContainerLogFileContents(filesWithProgress, logService, process);
62
63                 // Populate initial state with filters
64                 const initialState = createInitialLogPanelState(logFiles, logLines);
65                 dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
66             }
67         } catch (e) {
68             // On error, populate empty state to allow polling to start
69             const initialState = createInitialLogPanelState([], []);
70             dispatch(processLogsPanelActions.INIT_PROCESS_LOGS_PANEL(initialState));
71             // Only show toast on errors other than 404 since 404 is expected when logs do not exist yet
72             if (e.status !== 404) {
73                 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Error loading process logs', hideDuration: 4000, kind: SnackbarKind.ERROR }));
74             }
75             if (e.status === 404 && process?.containerRequest.state === ContainerRequestState.FINAL) {
76                 dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
77             }
78         }
79     };
80
81 export const pollProcessLogs = (processUuid: string) =>
82     async (dispatch: Dispatch, getState: () => RootState, { logService }: ServiceRepository) => {
83         try {
84             // Get log panel state and process from store
85             const currentState = getState().processLogsPanel;
86             const process = getProcess(processUuid)(getState().resources);
87
88             // Check if container request is present and initial logs state loaded
89             if (process?.containerRequest?.uuid && Object.keys(currentState.logs).length > 0) {
90                 const logFiles = await loadContainerLogFileList(process.containerRequest, logService);
91
92                 // Determine byte to fetch from while filtering unchanged files
93                 const filesToUpdateWithProgress = logFiles.reduce((acc, updatedFile) => {
94                     // Fetch last byte or 0 for new log files
95                     const currentStateLogLastByte = currentState.logs[logFileToLogType(updatedFile)]?.lastByte || 0;
96
97                     const isNew = !Object.keys(currentState.logs).find((currentStateLogName) => (updatedFile.name.startsWith(currentStateLogName)));
98                     const isChanged = !isNew && currentStateLogLastByte < updatedFile.size;
99
100                     if (isNew || isChanged) {
101                         return acc.concat({ file: updatedFile, lastByte: currentStateLogLastByte });
102                     } else {
103                         return acc;
104                     }
105                 }, [] as FileWithProgress[]);
106
107                 // Perform range request(s) for each file
108                 const logFragments = await loadContainerLogFileContents(filesToUpdateWithProgress, logService, process);
109
110                 if (logFragments.length) {
111                     // Convert LogFragments to ProcessLogs with All/Main sorting & line-merging
112                     const groupedLogs = groupLogs(logFiles, logFragments);
113                     await dispatch(processLogsPanelActions.ADD_PROCESS_LOGS_PANEL_ITEM(groupedLogs));
114                 }
115             }
116             return Promise.resolve();
117         } catch (e) {
118             // Remove log when polling error is handled in some way instead of being ignored
119             console.error("Error occurred in pollProcessLogs:", e);
120             return Promise.reject();
121         }
122     };
123
124 const loadContainerLogFileList = async (containerRequest: ContainerRequestResource, logService: LogService) => {
125     const logCollectionContents = await logService.listLogFiles(containerRequest);
126
127     // Filter only root directory files matching log event types which have bytes
128     return logCollectionContents.filter((file): file is CollectionFile => (
129         file.type === CollectionFileType.FILE &&
130         PROCESS_PANEL_LOG_EVENT_TYPES.indexOf(logFileToLogType(file)) > -1 &&
131         file.size > 0
132     ));
133 };
134
135 /**
136  * Loads the contents of each file from each file's lastByte simultaneously
137  *   while respecting the maxLogFetchSize by requesting the start and end
138  *   of the desired block and inserting a snipline.
139  * @param logFilesWithProgress CollectionFiles with the last byte previously loaded
140  * @param logService
141  * @param process
142  * @returns LogFragment[] containing a single LogFragment corresponding to each input file
143  */
144 const loadContainerLogFileContents = async (logFilesWithProgress: FileWithProgress[], logService: LogService, process: Process) => (
145     (await Promise.allSettled(logFilesWithProgress.filter(({ file }) => file.size > 0).map(({ file, lastByte }) => {
146         const requestSize = file.size - lastByte;
147         if (requestSize > maxLogFetchSize) {
148             const chunkSize = Math.floor(maxLogFetchSize / 2);
149             const firstChunkEnd = lastByte + chunkSize - 1;
150             return Promise.all([
151                 logService.getLogFileContents(process.containerRequest, file, lastByte, firstChunkEnd),
152                 logService.getLogFileContents(process.containerRequest, file, file.size - chunkSize, file.size - 1)
153             ] as Promise<(LogFragment)>[]);
154         } else {
155             return Promise.all([logService.getLogFileContents(process.containerRequest, file, lastByte, file.size - 1)]);
156         }
157     })).then((res) => {
158         if (res.length && res.every(promiseResult => (promiseResult.status === 'rejected'))) {
159             // Since allSettled does not pass promise rejection we throw an
160             //   error if every request failed
161             const error = res.find(
162                 (promiseResult): promiseResult is PromiseRejectedResult => promiseResult.status === 'rejected'
163             )?.reason;
164             return Promise.reject(error);
165         }
166         return res.filter((promiseResult): promiseResult is PromiseFulfilledResult<LogFragment[]> => (
167             // Filter out log files with rejected promises
168             //   (Promise.all rejects on any failure)
169             promiseResult.status === 'fulfilled' &&
170             // Filter out files where any fragment is empty
171             //   (prevent incorrect snipline generation or an un-resumable situation)
172             !!promiseResult.value.every(logFragment => logFragment.contents.length)
173         )).map(one => one.value)
174     })).map((logResponseSet) => {
175         // For any multi fragment response set, modify the last line of non-final chunks to include a line break and snip line
176         //   Don't add snip line as a separate line so that sorting won't reorder it
177         for (let i = 1; i < logResponseSet.length; i++) {
178             const fragment = logResponseSet[i - 1];
179             const lastLineIndex = fragment.contents.length - 1;
180             const lastLineContents = fragment.contents[lastLineIndex];
181             const newLastLine = `${lastLineContents}\n${SNIPLINE}`;
182
183             logResponseSet[i - 1].contents[lastLineIndex] = newLastLine;
184         }
185
186         // Merge LogFragment Array (representing multiple log line arrays) into single LogLine[] / LogFragment
187         return logResponseSet.reduce((acc, curr: LogFragment) => ({
188             logType: curr.logType,
189             contents: [...(acc.contents || []), ...curr.contents]
190         }), {} as LogFragment);
191     })
192 );
193
194 const createInitialLogPanelState = (logFiles: CollectionFile[], logFragments: LogFragment[]): { filters: string[], logs: ProcessLogs } => {
195     const logs = groupLogs(logFiles, logFragments);
196     const filters = Object.keys(logs);
197     return { filters, logs };
198 }
199
200 /**
201  * Converts LogFragments into ProcessLogs, grouping and sorting All/Main logs
202  * @param logFiles
203  * @param logFragments
204  * @returns ProcessLogs for the store
205  */
206 const groupLogs = (logFiles: CollectionFile[], logFragments: LogFragment[]): ProcessLogs => {
207     const sortableLogFragments = mergeMultilineLoglines(logFragments);
208
209     const allLogs = mergeSortLogFragments(sortableLogFragments);
210     const mainLogs = mergeSortLogFragments(sortableLogFragments.filter((fragment) => (MAIN_EVENT_TYPES.includes(fragment.logType))));
211
212     const groupedLogs = logFragments.reduce((grouped, fragment) => ({
213         ...grouped,
214         [fragment.logType as string]: { lastByte: fetchLastByteNumber(logFiles, fragment.logType), contents: fragment.contents }
215     }), {});
216
217     return {
218         [MAIN_FILTER_TYPE]: { lastByte: undefined, contents: mainLogs },
219         [ALL_FILTER_TYPE]: { lastByte: undefined, contents: allLogs },
220         ...groupedLogs,
221     }
222 };
223
224 /**
225  * Checks for non-timestamped log lines and merges them with the previous line, assumes they are multi-line logs
226  *   If there is no previous line (first line has no timestamp), the line is deleted.
227  *   Only used for combined logs that need sorting by timestamp after merging
228  * @param logFragments
229  * @returns Modified LogFragment[]
230  */
231 const mergeMultilineLoglines = (logFragments: LogFragment[]) => (
232     logFragments.map((fragment) => {
233         // Avoid altering the original fragment copy
234         let fragmentCopy: LogFragment = {
235             logType: fragment.logType,
236             contents: [...fragment.contents],
237         }
238         // Merge any non-timestamped lines in sortable log types with previous line
239         if (fragmentCopy.contents.length && !NON_SORTED_LOG_TYPES.includes(fragmentCopy.logType)) {
240             for (let i = 0; i < fragmentCopy.contents.length; i++) {
241                 const lineContents = fragmentCopy.contents[i];
242                 if (!lineContents.match(LOG_TIMESTAMP_PATTERN)) {
243                     // Partial line without timestamp detected
244                     if (i > 0) {
245                         // If not first line, copy line to previous line
246                         const previousLineContents = fragmentCopy.contents[i - 1];
247                         const newPreviousLineContents = `${previousLineContents}\n${lineContents}`;
248                         fragmentCopy.contents[i - 1] = newPreviousLineContents;
249                     }
250                     // Delete the current line and prevent iterating
251                     fragmentCopy.contents.splice(i, 1);
252                     i--;
253                 }
254             }
255         }
256         return fragmentCopy;
257     })
258 );
259
260 /**
261  * Merges log lines of different types and sorts types that contain timestamps (are sortable)
262  * @param logFragments
263  * @returns string[] of merged and sorted log lines
264  */
265 const mergeSortLogFragments = (logFragments: LogFragment[]): string[] => {
266     const sortableFragments = logFragments
267         .filter((fragment) => (!NON_SORTED_LOG_TYPES.includes(fragment.logType)));
268
269     const nonSortableLines = fragmentsToLines(logFragments
270         .filter((fragment) => (NON_SORTED_LOG_TYPES.includes(fragment.logType)))
271         .sort((a, b) => (a.logType.localeCompare(b.logType))));
272
273     return [...nonSortableLines, ...sortLogFragments(sortableFragments)];
274 };
275
276 /**
277  * Performs merge and sort of input log fragment lines
278  * @param logFragments set of sortable log fragments to be merged and sorted
279  * @returns A string array containing all lines, sorted by timestamp and
280  *          preserving line ordering and type grouping when timestamps match
281  */
282 const sortLogFragments = (logFragments: LogFragment[]): string[] => {
283     const linesWithType: SortableLine[] = logFragments
284         // Map each logFragment into an array of SortableLine
285         .map((fragment: LogFragment): SortableLine[] => (
286             fragment.contents.map((singleLine: string) => {
287                 const timestampMatch = singleLine.match(LOG_TIMESTAMP_PATTERN);
288                 const timestamp = timestampMatch && timestampMatch[0] ? timestampMatch[0] : "";
289                 return {
290                     logType: fragment.logType,
291                     timestamp: timestamp,
292                     contents: singleLine,
293                 };
294             })
295         // Merge each array of SortableLine into single array
296         )).reduce((acc: SortableLine[], lines: SortableLine[]) => (
297             [...acc, ...lines]
298         ), [] as SortableLine[]);
299
300     return linesWithType
301         .sort(sortableLineSortFunc)
302         .map(lineWithType => lineWithType.contents);
303 };
304
305 /**
306  * Sort func to sort lines
307  *   Preserves original ordering of lines from the same source
308  *   Stably orders lines of differing type but same timestamp
309  *     (produces a block of same-timestamped lines of one type before a block
310  *     of same timestamped lines of another type for readability)
311  *   Sorts all other lines by contents (ie by timestamp)
312  */
313 const sortableLineSortFunc = (a: SortableLine, b: SortableLine) => {
314     if (a.logType === b.logType) {
315         return 0;
316     } else if (a.timestamp === b.timestamp) {
317         return a.logType.localeCompare(b.logType);
318     } else {
319         return a.contents.localeCompare(b.contents);
320     }
321 };
322
323 const fragmentsToLines = (fragments: LogFragment[]): string[] => (
324     fragments.reduce((acc, fragment: LogFragment) => (
325         acc.concat(...fragment.contents)
326     ), [] as string[])
327 );
328
329 const fetchLastByteNumber = (logFiles: CollectionFile[], key: string) => {
330     return logFiles.find((file) => (file.name.startsWith(key)))?.size
331 };
332
333 export const navigateToLogCollection = (uuid: string) =>
334     async (dispatch: Dispatch<any>, getState: () => RootState, services: ServiceRepository) => {
335         try {
336             await services.collectionService.get(uuid);
337             dispatch<any>(navigateTo(uuid));
338         } catch {
339             dispatch(snackbarActions.OPEN_SNACKBAR({ message: 'Log collection was trashed or deleted.', hideDuration: 4000, kind: SnackbarKind.WARNING }));
340         }
341     };
342
343 const ALL_FILTER_TYPE = 'All logs';
344
345 const MAIN_FILTER_TYPE = 'Main logs';
346 const MAIN_EVENT_TYPES = [
347     LogEventType.CRUNCH_RUN,
348     LogEventType.STDERR,
349     LogEventType.STDOUT,
350 ];
351
352 const PROCESS_PANEL_LOG_EVENT_TYPES = [
353     LogEventType.ARV_MOUNT,
354     LogEventType.CRUNCH_RUN,
355     LogEventType.CRUNCHSTAT,
356     LogEventType.DISPATCH,
357     LogEventType.HOSTSTAT,
358     LogEventType.NODE_INFO,
359     LogEventType.STDERR,
360     LogEventType.STDOUT,
361     LogEventType.CONTAINER,
362     LogEventType.KEEPSTORE,
363 ];
364
365 const NON_SORTED_LOG_TYPES = [
366     LogEventType.NODE_INFO,
367     LogEventType.CONTAINER,
368 ];